You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by pw...@apache.org on 2021/08/05 07:32:21 UTC

[hudi] 02/03: [HUDI-457]Redo hudi-common log statements using SLF4J (#1161)

This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch redo-log
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit a0dc09ec06603a17ccf3962d2172c4bb87c1f3b4
Author: Li Jiaq <se...@gmail.com>
AuthorDate: Fri Jan 10 13:06:42 2020 +0800

    [HUDI-457]Redo hudi-common log statements using SLF4J (#1161)
---
 hudi-common/pom.xml                                |  6 ++++
 .../hudi/common/model/HoodieCommitMetadata.java    |  8 ++---
 .../hudi/common/model/HoodiePartitionMetadata.java | 11 +++----
 .../common/model/HoodieRollingStatMetadata.java    |  8 ++---
 .../hudi/common/table/HoodieTableConfig.java       |  8 ++---
 .../hudi/common/table/HoodieTableMetaClient.java   | 16 ++++-----
 .../table/log/AbstractHoodieLogRecordScanner.java  | 29 ++++++++---------
 .../hudi/common/table/log/HoodieLogFileReader.java | 12 +++----
 .../hudi/common/table/log/HoodieLogFormat.java     | 13 ++++----
 .../common/table/log/HoodieLogFormatReader.java    |  8 ++---
 .../common/table/log/HoodieLogFormatWriter.java    | 23 +++++++------
 .../table/log/HoodieMergedLogRecordScanner.java    | 16 ++++-----
 .../table/timeline/HoodieActiveTimeline.java       | 30 ++++++++---------
 .../table/timeline/HoodieArchivedTimeline.java     |  4 ---
 .../table/timeline/HoodieDefaultTimeline.java      |  4 ---
 .../table/view/AbstractTableFileSystemView.java    | 19 +++++------
 .../common/table/view/FileSystemViewManager.java   | 15 ++++-----
 .../table/view/HoodieTableFileSystemView.java      |  8 ++---
 .../IncrementalTimelineSyncFileSystemView.java     | 38 +++++++++++-----------
 .../table/view/PriorityBasedFileSystemView.java    |  6 ++--
 .../view/RemoteHoodieTableFileSystemView.java      |  8 ++---
 .../table/view/RocksDbBasedFileSystemView.java     | 26 +++++++--------
 .../view/SpillableMapBasedFileSystemView.java      | 14 ++++----
 .../apache/hudi/common/util/CompactionUtils.java   |  5 ---
 .../common/util/DFSPropertiesConfiguration.java    |  8 ++---
 .../java/org/apache/hudi/common/util/FSUtils.java  | 12 +++----
 .../hudi/common/util/FailSafeConsistencyGuard.java | 14 ++++----
 .../common/util/HoodieRecordSizeEstimator.java     |  8 ++---
 .../org/apache/hudi/common/util/RocksDBDAO.java    | 18 +++++-----
 .../hudi/common/util/TimelineDiffHelper.java       | 10 +++---
 .../hudi/common/util/collection/DiskBasedMap.java  | 11 +++----
 .../util/collection/ExternalSpillableMap.java      | 10 +++---
 .../common/util/queue/BoundedInMemoryExecutor.java |  6 ++--
 .../common/util/queue/BoundedInMemoryQueue.java    |  6 ++--
 .../util/queue/FunctionBasedQueueProducer.java     |  6 ++--
 .../util/queue/IteratorBasedQueueProducer.java     |  6 ++--
 .../hudi/common/minicluster/HdfsTestService.java   | 10 +++---
 .../common/minicluster/ZookeeperTestService.java   | 10 +++---
 .../table/view/TestHoodieTableFileSystemView.java  |  8 ++---
 39 files changed, 232 insertions(+), 246 deletions(-)

diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml
index c9aaf7a..f153119 100644
--- a/hudi-common/pom.xml
+++ b/hudi-common/pom.xml
@@ -166,6 +166,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>com.github.stefanbirkner</groupId>
       <artifactId>system-rules</artifactId>
       <version>1.16.0</version>
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index 475f75c..9a69545 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -25,8 +25,8 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.PropertyAccessor;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -43,7 +43,7 @@ import java.util.Map;
 public class HoodieCommitMetadata implements Serializable {
 
   public static final String SCHEMA_KEY = "schema";
-  private static final Logger LOG = LogManager.getLogger(HoodieCommitMetadata.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieCommitMetadata.class);
   protected Map<String, List<HoodieWriteStat>> partitionToWriteStats;
   protected Boolean compacted;
 
@@ -118,7 +118,7 @@ public class HoodieCommitMetadata implements Serializable {
 
   public String toJsonString() throws IOException {
     if (partitionToWriteStats.containsKey(null)) {
-      LOG.info("partition path is null for " + partitionToWriteStats.get(null));
+      LOG.info("partition path is null for {}", partitionToWriteStats.get(null));
       partitionToWriteStats.remove(null);
     }
     return getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
index 013869c..7fd6cda 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
@@ -24,8 +24,8 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Properties;
@@ -51,7 +51,7 @@ public class HoodiePartitionMetadata {
 
   private final FileSystem fs;
 
-  private static final Logger LOG = LogManager.getLogger(HoodiePartitionMetadata.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodiePartitionMetadata.class);
 
   /**
    * Construct metadata from existing partition.
@@ -101,8 +101,7 @@ public class HoodiePartitionMetadata {
         fs.rename(tmpMetaPath, metaPath);
       }
     } catch (IOException ioe) {
-      LOG.warn("Error trying to save partition metadata (this is okay, as long as atleast 1 of these succced), "
-          + partitionPath, ioe);
+      LOG.warn("Error trying to save partition metadata (this is okay, as long as atleast 1 of these succced), {}", partitionPath, ioe);
     } finally {
       if (!metafileExists) {
         try {
@@ -111,7 +110,7 @@ public class HoodiePartitionMetadata {
             fs.delete(tmpMetaPath, false);
           }
         } catch (IOException ioe) {
-          LOG.warn("Error trying to clean up temporary files for " + partitionPath, ioe);
+          LOG.warn("Error trying to clean up temporary files for {}", partitionPath, ioe);
         }
       }
     }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRollingStatMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRollingStatMetadata.java
index bd1ef94..0459eb5 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRollingStatMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRollingStatMetadata.java
@@ -18,8 +18,8 @@
 
 package org.apache.hudi.common.model;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -31,7 +31,7 @@ import java.util.Map;
  */
 public class HoodieRollingStatMetadata implements Serializable {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieRollingStatMetadata.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieRollingStatMetadata.class);
   protected Map<String, Map<String, HoodieRollingStat>> partitionToRollingStats;
   private String actionType = "DUMMY_ACTION";
   public static final String ROLLING_STAT_METADATA_KEY = "ROLLING_STAT";
@@ -78,7 +78,7 @@ public class HoodieRollingStatMetadata implements Serializable {
 
   public String toJsonString() throws IOException {
     if (partitionToRollingStats.containsKey(null)) {
-      LOG.info("partition path is null for " + partitionToRollingStats.get(null));
+      LOG.info("partition path is null for {}", partitionToRollingStats.get(null));
       partitionToRollingStats.remove(null);
     }
     return HoodieCommitMetadata.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index cc950f7..e08a3b8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -28,8 +28,8 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -48,7 +48,7 @@ import java.util.stream.Collectors;
  */
 public class HoodieTableConfig implements Serializable {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieTableConfig.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieTableConfig.class);
 
   public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties";
   public static final String HOODIE_TABLE_NAME_PROP_NAME = "hoodie.table.name";
@@ -71,7 +71,7 @@ public class HoodieTableConfig implements Serializable {
   public HoodieTableConfig(FileSystem fs, String metaPath) {
     Properties props = new Properties();
     Path propertyPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
-    LOG.info("Loading dataset properties from " + propertyPath);
+    LOG.info("Loading dataset properties from {}", propertyPath);
     try {
       try (FSDataInputStream inputStream = fs.open(propertyPath)) {
         props.load(inputStream);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 489b204..6f5feae 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -39,8 +39,8 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -66,7 +66,7 @@ import java.util.stream.Stream;
  */
 public class HoodieTableMetaClient implements Serializable {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieTableMetaClient.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieTableMetaClient.class);
   public static String METAFOLDER_NAME = ".hoodie";
   public static String TEMPFOLDER_NAME = METAFOLDER_NAME + File.separator + ".temp";
   public static String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + File.separator + ".aux";
@@ -97,7 +97,7 @@ public class HoodieTableMetaClient implements Serializable {
   public HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad,
       ConsistencyGuardConfig consistencyGuardConfig, Option<TimelineLayoutVersion> layoutVersion)
       throws DatasetNotFoundException {
-    LOG.info("Loading HoodieTableMetaClient from " + basePath);
+    LOG.info("Loading HoodieTableMetaClient from {}", basePath);
     this.basePath = basePath;
     this.consistencyGuardConfig = consistencyGuardConfig;
     this.hadoopConf = new SerializableConfiguration(conf);
@@ -110,9 +110,9 @@ public class HoodieTableMetaClient implements Serializable {
     this.tableType = tableConfig.getTableType();
     this.timelineLayoutVersion = layoutVersion.orElse(tableConfig.getTimelineLayoutVersion());
     this.loadActiveTimelineOnLoad = loadActiveTimelineOnLoad;
-    LOG.info("Finished Loading Table of type " + tableType + "(version=" + timelineLayoutVersion + ") from " + basePath);
+    LOG.info("Finished Loading Table of type {}(version={}) from {}", tableType, timelineLayoutVersion, basePath);
     if (loadActiveTimelineOnLoad) {
-      LOG.info("Loading Active commit timeline for " + basePath);
+      LOG.info("Loading Active commit timeline for {}", basePath);
       getActiveTimeline();
     }
   }
@@ -324,7 +324,7 @@ public class HoodieTableMetaClient implements Serializable {
    */
   public static HoodieTableMetaClient initDatasetAndGetMetaClient(Configuration hadoopConf, String basePath,
       Properties props) throws IOException {
-    LOG.info("Initializing " + basePath + " as hoodie dataset " + basePath);
+    LOG.info("Initializing {} as hoodie dataset", basePath);
     Path basePathDir = new Path(basePath);
     final FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
     if (!fs.exists(basePathDir)) {
@@ -361,7 +361,7 @@ public class HoodieTableMetaClient implements Serializable {
     // We should not use fs.getConf as this might be different from the original configuration
     // used to create the fs in unit tests
     HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
-    LOG.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType() + " from " + basePath);
+    LOG.info("Finished initializing Table of type {} from {}", metaClient.getTableConfig().getTableType(), basePath);
     return metaClient;
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
index 42f1f9a..2e783f3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -36,8 +36,8 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -66,7 +66,7 @@ import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlo
  */
 public abstract class AbstractHoodieLogRecordScanner {
 
-  private static final Logger LOG = LogManager.getLogger(AbstractHoodieLogRecordScanner.class);
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractHoodieLogRecordScanner.class);
 
   // Reader schema for the records
   protected final Schema readerSchema;
@@ -131,7 +131,7 @@ public abstract class AbstractHoodieLogRecordScanner {
       Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
       while (logFormatReaderWrapper.hasNext()) {
         HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
-        LOG.info("Scanning log file " + logFile);
+        LOG.info("Scanning log file {}", logFile);
         scannedLogFiles.add(logFile);
         totalLogFiles.set(scannedLogFiles.size());
         // Use the HoodieLogFileReader to iterate through the blocks in the log file
@@ -145,7 +145,7 @@ public abstract class AbstractHoodieLogRecordScanner {
         }
         switch (r.getBlockType()) {
           case AVRO_DATA_BLOCK:
-            LOG.info("Reading a data block from file " + logFile.getPath());
+            LOG.info("Reading a data block from file {}", logFile.getPath());
             if (isNewInstantBlock(r) && !readBlocksLazily) {
               // If this is an avro data block belonging to a different commit/instant,
               // then merge the last blocks and records into the main result
@@ -155,7 +155,7 @@ public abstract class AbstractHoodieLogRecordScanner {
             currentInstantLogBlocks.push(r);
             break;
           case DELETE_BLOCK:
-            LOG.info("Reading a delete block from file " + logFile.getPath());
+            LOG.info("Reading a delete block from file {}", logFile.getPath());
             if (isNewInstantBlock(r) && !readBlocksLazily) {
               // If this is a delete data block belonging to a different commit/instant,
               // then merge the last blocks and records into the main result
@@ -177,7 +177,7 @@ public abstract class AbstractHoodieLogRecordScanner {
             // written per ingestion batch for a file but in reality we need to rollback (B1 & B2)
             // The following code ensures the same rollback block (R1) is used to rollback
             // both B1 & B2
-            LOG.info("Reading a command block from file " + logFile.getPath());
+            LOG.info("Reading a command block from file {}", logFile.getPath());
             // This is a command block - take appropriate action based on the command
             HoodieCommandBlock commandBlock = (HoodieCommandBlock) r;
             String targetInstantForCommandBlock =
@@ -196,34 +196,33 @@ public abstract class AbstractHoodieLogRecordScanner {
                   HoodieLogBlock lastBlock = currentInstantLogBlocks.peek();
                   // handle corrupt blocks separately since they may not have metadata
                   if (lastBlock.getBlockType() == CORRUPT_BLOCK) {
-                    LOG.info("Rolling back the last corrupted log block read in " + logFile.getPath());
+                    LOG.info("Rolling back the last corrupted log block read in {}", logFile.getPath());
                     currentInstantLogBlocks.pop();
                     numBlocksRolledBack++;
                   } else if (lastBlock.getBlockType() != CORRUPT_BLOCK
                       && targetInstantForCommandBlock.contentEquals(lastBlock.getLogBlockHeader().get(INSTANT_TIME))) {
                     // rollback last data block or delete block
-                    LOG.info("Rolling back the last log block read in " + logFile.getPath());
+                    LOG.info("Rolling back the last log block read in {}", logFile.getPath());
                     currentInstantLogBlocks.pop();
                     numBlocksRolledBack++;
                   } else if (!targetInstantForCommandBlock
                       .contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME))) {
                     // invalid or extra rollback block
-                    LOG.warn("TargetInstantTime " + targetInstantForCommandBlock
-                        + " invalid or extra rollback command block in " + logFile.getPath());
+                    LOG.warn("TargetInstantTime {} invalid or extra rollback command block in {}", targetInstantForCommandBlock, logFile.getPath());
                     break;
                   } else {
                     // this should not happen ideally
-                    LOG.warn("Unable to apply rollback command block in " + logFile.getPath());
+                    LOG.warn("Unable to apply rollback command block in {}", logFile.getPath());
                   }
                 }
-                LOG.info("Number of applied rollback blocks " + numBlocksRolledBack);
+                LOG.info("Number of applied rollback blocks {}", numBlocksRolledBack);
                 break;
               default:
                 throw new UnsupportedOperationException("Command type not yet supported.");
             }
             break;
           case CORRUPT_BLOCK:
-            LOG.info("Found a corrupt block in " + logFile.getPath());
+            LOG.info("Found a corrupt block in {}", logFile.getPath());
             totalCorruptBlocks.incrementAndGet();
             // If there is a corrupt block - we will assume that this was the next data block
             currentInstantLogBlocks.push(r);
@@ -297,7 +296,7 @@ public abstract class AbstractHoodieLogRecordScanner {
    */
   private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> lastBlocks, int numLogFilesSeen) throws Exception {
     while (!lastBlocks.isEmpty()) {
-      LOG.info("Number of remaining logblocks to merge " + lastBlocks.size());
+      LOG.info("Number of remaining logblocks to merge {}", lastBlocks.size());
       // poll the element at the bottom of the stack since that's the order it was inserted
       HoodieLogBlock lastBlock = lastBlocks.pollLast();
       switch (lastBlock.getBlockType()) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 354f809..994941a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -38,8 +38,8 @@ import org.apache.hadoop.fs.BufferedFSInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -54,7 +54,7 @@ import java.util.Map;
 class HoodieLogFileReader implements HoodieLogFormat.Reader {
 
   public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024; // 16 MB
-  private static final Logger LOG = LogManager.getLogger(HoodieLogFileReader.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieLogFileReader.class);
 
   private final FSDataInputStream inputStream;
   private final HoodieLogFile logFile;
@@ -113,7 +113,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
         try {
           close();
         } catch (Exception e) {
-          LOG.warn("unable to close input stream for log file " + logFile, e);
+          LOG.warn("unable to close input stream for log file {}", logFile, e);
           // fail silently for any sort of exception
         }
       }
@@ -211,12 +211,12 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
   }
 
   private HoodieLogBlock createCorruptBlock() throws IOException {
-    LOG.info("Log " + logFile + " has a corrupted block at " + inputStream.getPos());
+    LOG.info("Log {} has a corrupted block at {}", logFile, inputStream.getPos());
     long currentPos = inputStream.getPos();
     long nextBlockOffset = scanForNextAvailableBlockOffset();
     // Rewind to the initial start and read corrupted bytes till the nextBlockOffset
     inputStream.seek(currentPos);
-    LOG.info("Next available block in " + logFile + " starts at " + nextBlockOffset);
+    LOG.info("Next available block in {} starts at {}", logFile, nextBlockOffset);
     int corruptedBlockSize = (int) (nextBlockOffset - currentPos);
     long contentPosition = inputStream.getPos();
     byte[] corruptedBytes = HoodieLogBlock.readOrSkipContent(inputStream, corruptedBlockSize, readBlockLazily);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
index a7165fb..69228ca 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
@@ -27,8 +27,8 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -104,7 +104,7 @@ public interface HoodieLogFormat {
    */
   class WriterBuilder {
 
-    private static final Logger LOG = LogManager.getLogger(WriterBuilder.class);
+    private static final Logger LOG = LoggerFactory.getLogger(WriterBuilder.class);
     // Default max log file size 512 MB
     public static final long DEFAULT_SIZE_THRESHOLD = 512 * 1024 * 1024L;
 
@@ -210,7 +210,7 @@ public interface HoodieLogFormat {
       }
 
       if (logVersion == null) {
-        LOG.info("Computing the next log version for " + logFileId + " in " + parentPath);
+        LOG.info("Computing the next log version for {} in {}", logFileId, parentPath);
         Option<Pair<Integer, String>> versionAndWriteToken =
             FSUtils.getLatestLogVersion(fs, parentPath, logFileId, fileExtension, commitTime);
         if (versionAndWriteToken.isPresent()) {
@@ -222,8 +222,7 @@ public interface HoodieLogFormat {
           // Use rollover write token as write token to create new log file with tokens
           logWriteToken = rolloverLogWriteToken;
         }
-        LOG.info("Computed the next log version for " + logFileId + " in " + parentPath + " as " + logVersion
-            + " with write-token " + logWriteToken);
+        LOG.info("Computed the next log version for {} in {} as {} with write-token {}", logFileId, parentPath, logVersion, logWriteToken);
       }
 
       if (logWriteToken == null) {
@@ -234,7 +233,7 @@ public interface HoodieLogFormat {
 
       Path logPath = new Path(parentPath,
           FSUtils.makeLogFileName(logFileId, fileExtension, commitTime, logVersion, logWriteToken));
-      LOG.info("HoodieLogFile on path " + logPath);
+      LOG.info("HoodieLogFile on path {}", logPath);
       HoodieLogFile logFile = new HoodieLogFile(logPath);
 
       if (bufferSize == null) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
index a5834d2..b7bb7ae 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
@@ -24,8 +24,8 @@ import org.apache.hudi.exception.HoodieIOException;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -46,7 +46,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
   private final boolean reverseLogReader;
   private int bufferSize;
 
-  private static final Logger LOG = LogManager.getLogger(HoodieLogFormatReader.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieLogFormatReader.class);
 
   HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, Schema readerSchema, boolean readBlocksLazily,
       boolean reverseLogReader, int bufferSize) throws IOException {
@@ -103,7 +103,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
       } catch (IOException io) {
         throw new HoodieIOException("unable to initialize read with log file ", io);
       }
-      LOG.info("Moving to the next reader for logfile " + currentReader.getLogFile());
+      LOG.info("Moving to the next reader for logfile {}", currentReader.getLogFile());
       return this.currentReader.hasNext();
     }
     return false;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
index b8d5f89..25ec235 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
@@ -34,8 +34,8 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -44,7 +44,7 @@ import java.io.IOException;
  */
 public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieLogFormatWriter.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieLogFormatWriter.class);
 
   private HoodieLogFile logFile;
   private final FileSystem fs;
@@ -76,7 +76,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
     if (fs.exists(path)) {
       boolean isAppendSupported = StorageSchemes.isAppendSupported(fs.getScheme());
       if (isAppendSupported) {
-        LOG.info(logFile + " exists. Appending to existing file");
+        LOG.info("{} exists. Appending to existing file", logFile);
         try {
           this.output = fs.append(path, bufferSize);
         } catch (RemoteException e) {
@@ -93,11 +93,11 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
       }
       if (!isAppendSupported) {
         this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
-        LOG.info("Append not supported.. Rolling over to " + logFile);
+        LOG.info("Append not supported.. Rolling over to {}", logFile);
         createNewFile();
       }
     } else {
-      LOG.info(logFile + " does not exist. Create a new file");
+      LOG.info("{} does not exist. Create a new file", logFile);
       // Block size does not matter as we will always manually autoflush
       createNewFile();
     }
@@ -180,8 +180,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
     if (getCurrentSize() > sizeThreshold) {
       // TODO - make an end marker which seals the old log file (no more appends possible to that
       // file).
-      LOG.info("CurrentSize " + getCurrentSize() + " has reached threshold " + sizeThreshold
-          + ". Rolling over to the next version");
+      LOG.info("CurrentSize {} has reached threshold {}. Rolling over to the next version", getCurrentSize(), sizeThreshold);
       HoodieLogFile newLogFile = logFile.rollOver(fs, rolloverLogWriteToken);
       // close this writer and return the new writer
       close();
@@ -237,7 +236,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
       this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
       createNewFile();
     } else if (e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName())) {
-      LOG.warn("Another task executor writing to the same log file(" + logFile + ". Rolling over");
+      LOG.warn("Another task executor writing to the same log file({}. Rolling over", logFile);
       // Rollover the current log file (since cannot get a stream handle) and create new one
       this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
       createNewFile();
@@ -246,13 +245,13 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
       // this happens when either another task executor writing to this file died or
       // data node is going down. Note that we can only try to recover lease for a DistributedFileSystem.
       // ViewFileSystem unfortunately does not support this operation
-      LOG.warn("Trying to recover log on path " + path);
+      LOG.warn("Trying to recover log on path {}", path);
       if (FSUtils.recoverDFSFileLease((DistributedFileSystem) fs, path)) {
-        LOG.warn("Recovered lease on path " + path);
+        LOG.warn("Recovered lease on path {}", path);
         // try again
         this.output = fs.append(path, bufferSize);
       } else {
-        LOG.warn("Failed to recover lease on path " + path);
+        LOG.warn("Failed to recover lease on path {}", path);
         throw new HoodieException(e);
       }
     } else {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index e6246c4..228af19 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -30,8 +30,8 @@ import org.apache.hudi.exception.HoodieIOException;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -54,7 +54,7 @@ import java.util.Map;
 public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
     implements Iterable<HoodieRecord<? extends HoodieRecordPayload>> {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieMergedLogRecordScanner.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieMergedLogRecordScanner.class);
 
   // Final map of compacted/merged records
   private final ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records;
@@ -81,12 +81,12 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
       scan();
       this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer();
       this.numMergedRecordsInLog = records.size();
-      LOG.info("MaxMemoryInBytes allowed for compaction => " + maxMemorySizeInBytes);
-      LOG.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => " + records.getInMemoryMapNumEntries());
+      LOG.info("MaxMemoryInBytes allowed for compaction => {}", maxMemorySizeInBytes);
+      LOG.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => {}", records.getInMemoryMapNumEntries());
       LOG.info(
-          "Total size in bytes of MemoryBasedMap in ExternalSpillableMap => " + records.getCurrentInMemoryMapSize());
-      LOG.info("Number of entries in DiskBasedMap in ExternalSpillableMap => " + records.getDiskBasedMapNumEntries());
-      LOG.info("Size of file spilled to disk => " + records.getSizeOfFileOnDiskInBytes());
+          "Total size in bytes of MemoryBasedMap in ExternalSpillableMap => {}", records.getCurrentInMemoryMapSize());
+      LOG.info("Number of entries in DiskBasedMap in ExternalSpillableMap => {}", records.getDiskBasedMapNumEntries());
+      LOG.info("Size of file spilled to disk => {}", records.getSizeOfFileOnDiskInBytes());
     } catch (IOException e) {
       throw new HoodieIOException("IOException when reading log file ");
     }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index cdf3764..ff50330 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -31,8 +31,8 @@ import com.google.common.collect.Sets;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -68,7 +68,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
           INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION,
           INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION}));
 
-  private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieActiveTimeline.class);
   protected HoodieTableMetaClient metaClient;
   private static AtomicReference<String> lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE));
 
@@ -103,7 +103,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
     // multiple casts will make this lambda serializable -
     // http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
     this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails;
-    LOG.info("Loaded instants " + getInstants().collect(Collectors.toList()));
+    LOG.info("Loaded instants {}", getInstants().collect(Collectors.toList()));
   }
 
   public HoodieActiveTimeline(HoodieTableMetaClient metaClient) {
@@ -221,24 +221,24 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
   }
 
   public void createNewInstant(HoodieInstant instant) {
-    LOG.info("Creating a new instant " + instant);
+    LOG.info("Creating a new instant {}", instant);
     // Create the in-flight file
     createFileInMetaPath(instant.getFileName(), Option.empty(), false);
   }
 
   public void saveAsComplete(HoodieInstant instant, Option<byte[]> data) {
-    LOG.info("Marking instant complete " + instant);
+    LOG.info("Marking instant complete {}", instant);
     Preconditions.checkArgument(instant.isInflight(),
-        "Could not mark an already completed instant as complete again " + instant);
+            "Could not mark an already completed instant as complete again " + instant);
     transitionState(instant, HoodieTimeline.getCompletedInstant(instant), data);
-    LOG.info("Completed " + instant);
+    LOG.info("Completed {}", instant);
   }
 
   public HoodieInstant revertToInflight(HoodieInstant instant) {
-    LOG.info("Reverting instant to inflight " + instant);
+    LOG.info("Reverting instant to inflight {}", instant);
     HoodieInstant inflight = HoodieTimeline.getInflightInstant(instant, metaClient.getTableType());
     revertCompleteToInflight(instant, inflight);
-    LOG.info("Reverted " + instant + " to inflight " + inflight);
+    LOG.info("Reverted {} to inflight {}", instant, inflight);
     return inflight;
   }
 
@@ -259,12 +259,12 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
   }
 
   private void deleteInstantFile(HoodieInstant instant) {
-    LOG.info("Deleting instant " + instant);
+    LOG.info("Deleting instant {}", instant);
     Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), instant.getFileName());
     try {
       boolean result = metaClient.getFs().delete(inFlightCommitFilePath, false);
       if (result) {
-        LOG.info("Removed instant " + instant);
+        LOG.info("Removed instant {}", instant);
       } else {
         throw new HoodieIOException("Could not delete instant " + instant);
       }
@@ -405,12 +405,12 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
         }
       } else {
         // Ensures old state exists in timeline
-        LOG.info("Checking for file exists ?" + new Path(metaClient.getMetaPath(), fromInstant.getFileName()));
+        LOG.info("Checking for file exists ?{}", new Path(metaClient.getMetaPath(), fromInstant.getFileName()));
         Preconditions.checkArgument(metaClient.getFs().exists(new Path(metaClient.getMetaPath(),
             fromInstant.getFileName())));
         // Use Write Once to create Target File
         createImmutableFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data);
-        LOG.info("Create new file for toInstant ?" + new Path(metaClient.getMetaPath(), toInstant.getFileName()));
+        LOG.info("Create new file for toInstant ?{}", new Path(metaClient.getMetaPath(), toInstant.getFileName()));
       }
     } catch (IOException e) {
       throw new HoodieIOException("Could not complete " + fromInstant, e);
@@ -491,7 +491,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
       // If the path does not exist, create it first
       if (!metaClient.getFs().exists(fullPath)) {
         if (metaClient.getFs().createNewFile(fullPath)) {
-          LOG.info("Created a new file in meta path: " + fullPath);
+          LOG.info("Created a new file in meta path: {}", fullPath);
         } else {
           throw new HoodieIOException("Failed to create file " + fullPath);
         }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index d26a88f..a7a10e4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -26,8 +26,6 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -54,8 +52,6 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
   private HoodieTableMetaClient metaClient;
   private Map<String, byte[]> readCommits = new HashMap<>();
 
-  private static final Logger LOG = LogManager.getLogger(HoodieArchivedTimeline.class);
-
   public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
     // Read back the commits to make sure
     Path archiveLogPath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath());
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index c61355c..4030fce 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -25,8 +25,6 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieException;
 
 import com.google.common.collect.Sets;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
@@ -47,8 +45,6 @@ import static java.util.Collections.reverse;
  */
 public class HoodieDefaultTimeline implements HoodieTimeline {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieDefaultTimeline.class);
-
   private static final String HASHING_ALGORITHM = "SHA-256";
 
   protected transient Function<HoodieInstant, Option<byte[]>> details;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index bd7462d..ea05782 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -38,8 +38,8 @@ import org.apache.hudi.exception.HoodieIOException;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -66,7 +66,7 @@ import java.util.stream.Stream;
  */
 public abstract class AbstractTableFileSystemView implements SyncableFileSystemView, Serializable {
 
-  private static final Logger LOG = LogManager.getLogger(AbstractTableFileSystemView.class);
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractTableFileSystemView.class);
 
   protected HoodieTableMetaClient metaClient;
 
@@ -123,8 +123,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
       }
     });
     long storePartitionsTs = timer.endTimer();
-    LOG.info("addFilesToView: NumFiles=" + statuses.length + ", FileGroupsCreationTime=" + fgBuildTimeTakenMs
-        + ", StoreTimeTaken=" + storePartitionsTs);
+    LOG.info("addFilesToView: NumFiles={}, FileGroupsCreationTime={}, StoreTimeTaken={}", statuses.length, fgBuildTimeTakenMs, storePartitionsTs);
     return fileGroups;
   }
 
@@ -217,7 +216,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
       if (!isPartitionAvailableInStore(partitionPathStr)) {
         // Not loaded yet
         try {
-          LOG.info("Building file system view for partition (" + partitionPathStr + ")");
+          LOG.info("Building file system view for partition ({})", partitionPathStr);
 
           // Create the path if it does not exist already
           Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePath(), partitionPathStr);
@@ -236,10 +235,10 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
           throw new HoodieIOException("Failed to list data files in partition " + partitionPathStr, e);
         }
       } else {
-        LOG.debug("View already built for Partition :" + partitionPathStr + ", FOUND is ");
+        LOG.debug("View already built for Partition :{}, FOUND is ", partitionPathStr);
       }
       long endTs = System.currentTimeMillis();
-      LOG.info("Time to load partition (" + partitionPathStr + ") =" + (endTs - beginTs));
+      LOG.info("Time to load partition ({}) = {}", partitionPathStr, (endTs - beginTs));
       return true;
     });
   }
@@ -290,7 +289,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
   protected boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) {
     Option<Pair<String, CompactionOperation>> compactionWithInstantTime =
         getPendingCompactionOperationWithInstant(fileSlice.getFileGroupId());
-    LOG.info("Pending Compaction instant for (" + fileSlice + ") is :" + compactionWithInstantTime);
+    LOG.info("Pending Compaction instant for ({}) is :{}", fileSlice, compactionWithInstantTime);
     return (compactionWithInstantTime.isPresent())
         && fileSlice.getBaseInstantTime().equals(compactionWithInstantTime.get().getKey());
   }
@@ -303,7 +302,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
    */
   protected FileSlice filterDataFileAfterPendingCompaction(FileSlice fileSlice) {
     if (isFileSliceAfterPendingCompaction(fileSlice)) {
-      LOG.info("File Slice (" + fileSlice + ") is in pending compaction");
+      LOG.info("File Slice ({}) is in pending compaction", fileSlice);
       // Data file is filtered out of the file-slice as the corresponding compaction
       // instant not completed yet.
       FileSlice transformed =
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
index e00b2c7..078f394 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
@@ -24,8 +24,8 @@ import org.apache.hudi.common.table.HoodieTimeline;
 import org.apache.hudi.common.table.SyncableFileSystemView;
 import org.apache.hudi.common.util.Functions.Function2;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -51,7 +51,7 @@ import java.util.concurrent.ConcurrentHashMap;
  * clients for querying.
  */
 public class FileSystemViewManager {
-  private static final Logger LOG = LogManager.getLogger(FileSystemViewManager.class);
+  private static final Logger LOG = LoggerFactory.getLogger(FileSystemViewManager.class);
 
   private final SerializableConfiguration conf;
   // The View Storage config used to store file-system views
@@ -126,7 +126,7 @@ public class FileSystemViewManager {
    */
   private static SpillableMapBasedFileSystemView createSpillableMapBasedFileSystemView(SerializableConfiguration conf,
       FileSystemViewStorageConfig viewConf, String basePath) {
-    LOG.info("Creating SpillableMap based view for basePath " + basePath);
+    LOG.info("Creating SpillableMap based view for basePath {}", basePath);
     HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf.newCopy(), basePath, true);
     HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
     return new SpillableMapBasedFileSystemView(metaClient, timeline, viewConf);
@@ -142,7 +142,7 @@ public class FileSystemViewManager {
    */
   private static HoodieTableFileSystemView createInMemoryFileSystemView(SerializableConfiguration conf,
       FileSystemViewStorageConfig viewConf, String basePath) {
-    LOG.info("Creating InMemory based view for basePath " + basePath);
+    LOG.info("Creating InMemory based view for basePath {}", basePath);
     HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf.newCopy(), basePath, true);
     HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
     return new HoodieTableFileSystemView(metaClient, timeline, viewConf.isIncrementalTimelineSyncEnabled());
@@ -158,8 +158,7 @@ public class FileSystemViewManager {
    */
   private static RemoteHoodieTableFileSystemView createRemoteFileSystemView(SerializableConfiguration conf,
       FileSystemViewStorageConfig viewConf, HoodieTableMetaClient metaClient) {
-    LOG.info("Creating remote view for basePath " + metaClient.getBasePath() + ". Server="
-        + viewConf.getRemoteViewServerHost() + ":" + viewConf.getRemoteViewServerPort());
+    LOG.info("Creating remote view for basePath {}. Server={}:{}", metaClient.getBasePath(), viewConf.getRemoteViewServerHost(), viewConf.getRemoteViewServerPort());
     return new RemoteHoodieTableFileSystemView(viewConf.getRemoteViewServerHost(), viewConf.getRemoteViewServerPort(),
         metaClient);
   }
@@ -173,7 +172,7 @@ public class FileSystemViewManager {
    */
   public static FileSystemViewManager createViewManager(final SerializableConfiguration conf,
       final FileSystemViewStorageConfig config) {
-    LOG.info("Creating View Manager with storage type :" + config.getStorageType());
+    LOG.info("Creating View Manager with storage type :{}", config.getStorageType());
     switch (config.getStorageType()) {
       case EMBEDDED_KV_STORE:
         LOG.info("Creating embedded rocks-db based Table View");
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
index dd71124..c9311ad 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
@@ -29,8 +29,8 @@ import org.apache.hudi.common.util.collection.Pair;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -48,7 +48,7 @@ import java.util.stream.Stream;
  */
 public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystemView {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieTableFileSystemView.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieTableFileSystemView.class);
 
   // mapping from partition paths to file groups contained within them
   protected Map<String, List<HoodieFileGroup>> partitionToFileGroupsMap;
@@ -193,7 +193,7 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem
 
   @Override
   protected void storePartitionView(String partitionPath, List<HoodieFileGroup> fileGroups) {
-    LOG.info("Adding file-groups for partition :" + partitionPath + ", #FileGroups=" + fileGroups.size());
+    LOG.info("Adding file-groups for partition :{}, #FileGroups={}", partitionPath, fileGroups.size());
     List<HoodieFileGroup> newList = new ArrayList<>(fileGroups);
     partitionToFileGroupsMap.put(partitionPath, newList);
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
index 19209ac..a11b75b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
@@ -42,8 +42,8 @@ import org.apache.hudi.exception.HoodieException;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
@@ -55,7 +55,7 @@ import java.util.stream.Collectors;
  */
 public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTableFileSystemView {
 
-  private static final Logger LOG = LogManager.getLogger(IncrementalTimelineSyncFileSystemView.class);
+  private static final Logger LOG = LoggerFactory.getLogger(IncrementalTimelineSyncFileSystemView.class);
 
   // Allows incremental Timeline syncing
   private final boolean incrementalTimelineSyncEnabled;
@@ -103,7 +103,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
    */
   private void runIncrementalSync(HoodieTimeline timeline, TimelineDiffResult diffResult) {
 
-    LOG.info("Timeline Diff Result is :" + diffResult);
+    LOG.info("Timeline Diff Result is :{}", diffResult);
 
     // First remove pending compaction instants which were completed
     diffResult.getFinishedCompactionInstants().stream().forEach(instant -> {
@@ -144,7 +144,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
    * @param instant Compaction Instant to be removed
    */
   private void removePendingCompactionInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
-    LOG.info("Removing completed compaction instant (" + instant + ")");
+    LOG.info("Removing completed compaction instant ({})", instant);
     HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp());
     removePendingCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant, plan)
         .map(instantPair -> Pair.of(instantPair.getValue().getKey(),
@@ -158,7 +158,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
    * @param instant Compaction Instant
    */
   private void addPendingCompactionInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
-    LOG.info("Syncing pending compaction instant (" + instant + ")");
+    LOG.info("Syncing pending compaction instant ({})", instant);
     HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp());
     List<Pair<String, CompactionOperation>> pendingOps =
         CompactionUtils.getPendingCompactionOperations(instant, compactionPlan)
@@ -189,13 +189,13 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
    * @param instant Instant
    */
   private void addCommitInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
-    LOG.info("Syncing committed instant (" + instant + ")");
+    LOG.info("Syncing committed instant ({})", instant);
     HoodieCommitMetadata commitMetadata =
         HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
     commitMetadata.getPartitionToWriteStats().entrySet().stream().forEach(entry -> {
       String partition = entry.getKey();
       if (isPartitionAvailableInStore(partition)) {
-        LOG.info("Syncing partition (" + partition + ") of instant (" + instant + ")");
+        LOG.info("Syncing partition ({}) of instant ({})", partition, instant);
         FileStatus[] statuses = entry.getValue().stream().map(p -> {
           FileStatus status = new FileStatus(p.getFileSizeInBytes(), false, 0, 0, 0, 0, null, null, null,
               new Path(String.format("%s/%s", metaClient.getBasePath(), p.getPath())));
@@ -205,10 +205,10 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
             buildFileGroups(statuses, timeline.filterCompletedAndCompactionInstants(), false);
         applyDeltaFileSlicesToPartitionView(partition, fileGroups, DeltaApplyMode.ADD);
       } else {
-        LOG.warn("Skipping partition (" + partition + ") when syncing instant (" + instant + ") as it is not loaded");
+        LOG.warn("Skipping partition ({}) when syncing instant ({}) as it is not loaded", partition, instant);
       }
     });
-    LOG.info("Done Syncing committed instant (" + instant + ")");
+    LOG.info("Done Syncing committed instant ({})", instant);
   }
 
   /**
@@ -218,7 +218,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
    * @param instant Restore Instant
    */
   private void addRestoreInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
-    LOG.info("Syncing restore instant (" + instant + ")");
+    LOG.info("Syncing restore instant ({})", instant);
     HoodieRestoreMetadata metadata =
         AvroUtils.deserializeAvroMetadata(timeline.getInstantDetails(instant).get(), HoodieRestoreMetadata.class);
 
@@ -232,7 +232,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
       removeFileSlicesForPartition(timeline, instant, e.getKey(),
           e.getValue().stream().map(x -> x.getValue()).collect(Collectors.toList()));
     });
-    LOG.info("Done Syncing restore instant (" + instant + ")");
+    LOG.info("Done Syncing restore instant ({})", instant);
   }
 
   /**
@@ -242,14 +242,14 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
    * @param instant Rollback Instant
    */
   private void addRollbackInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
-    LOG.info("Syncing rollback instant (" + instant + ")");
+    LOG.info("Syncing rollback instant ({})", instant);
     HoodieRollbackMetadata metadata =
         AvroUtils.deserializeAvroMetadata(timeline.getInstantDetails(instant).get(), HoodieRollbackMetadata.class);
 
     metadata.getPartitionMetadata().entrySet().stream().forEach(e -> {
       removeFileSlicesForPartition(timeline, instant, e.getKey(), e.getValue().getSuccessDeleteFiles());
     });
-    LOG.info("Done Syncing rollback instant (" + instant + ")");
+    LOG.info("Done Syncing rollback instant ({})", instant);
   }
 
   /**
@@ -259,7 +259,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
    * @param instant Clean instant
    */
   private void addCleanInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
-    LOG.info("Syncing cleaner instant (" + instant + ")");
+    LOG.info("Syncing cleaner instant ({})", instant);
     HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(metaClient, instant);
     cleanMetadata.getPartitionMetadata().entrySet().stream().forEach(entry -> {
       final String basePath = metaClient.getBasePath();
@@ -270,13 +270,13 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
           .collect(Collectors.toList());
       removeFileSlicesForPartition(timeline, instant, entry.getKey(), fullPathList);
     });
-    LOG.info("Done Syncing cleaner instant (" + instant + ")");
+    LOG.info("Done Syncing cleaner instant ({})", instant);
   }
 
   private void removeFileSlicesForPartition(HoodieTimeline timeline, HoodieInstant instant, String partition,
       List<String> paths) {
     if (isPartitionAvailableInStore(partition)) {
-      LOG.info("Removing file slices for partition (" + partition + ") for instant (" + instant + ")");
+      LOG.info("Removing file slices for partition ({}) for instant ({})", partition, instant);
       FileStatus[] statuses = paths.stream().map(p -> {
         FileStatus status = new FileStatus();
         status.setPath(new Path(p));
@@ -286,7 +286,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
           buildFileGroups(statuses, timeline.filterCompletedAndCompactionInstants(), false);
       applyDeltaFileSlicesToPartitionView(partition, fileGroups, DeltaApplyMode.REMOVE);
     } else {
-      LOG.warn("Skipping partition (" + partition + ") when syncing instant (" + instant + ") as it is not loaded");
+      LOG.warn("Skipping partition ({}) when syncing instant ({}) as it is not loaded", partition, instant);
     }
   }
 
@@ -309,7 +309,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
   protected void applyDeltaFileSlicesToPartitionView(String partition, List<HoodieFileGroup> deltaFileGroups,
       DeltaApplyMode mode) {
     if (deltaFileGroups.isEmpty()) {
-      LOG.info("No delta file groups for partition :" + partition);
+      LOG.info("No delta file groups for partition :{}", partition);
       return;
     }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
index e9087db..41966a0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
@@ -32,8 +32,8 @@ import org.apache.hudi.common.util.Functions.Function3;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.List;
@@ -45,7 +45,7 @@ import java.util.stream.Stream;
  */
 public class PriorityBasedFileSystemView implements SyncableFileSystemView, Serializable {
 
-  private static final Logger LOG = LogManager.getLogger(PriorityBasedFileSystemView.class);
+  private static final Logger LOG = LoggerFactory.getLogger(PriorityBasedFileSystemView.class);
 
   private final SyncableFileSystemView preferredView;
   private final SyncableFileSystemView secondaryView;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index c8e625b..7194ffa 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -43,8 +43,8 @@ import com.google.common.base.Preconditions;
 import org.apache.http.client.fluent.Request;
 import org.apache.http.client.fluent.Response;
 import org.apache.http.client.utils.URIBuilder;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -108,7 +108,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
   public static final String INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM = "includependingcompaction";
 
 
-  private static final Logger LOG = LogManager.getLogger(RemoteHoodieTableFileSystemView.class);
+  private static final Logger LOG = LoggerFactory.getLogger(RemoteHoodieTableFileSystemView.class);
 
   private final String serverHost;
   private final int serverPort;
@@ -148,7 +148,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
     builder.addParameter(TIMELINE_HASH, timeline.getTimelineHash());
 
     String url = builder.toString();
-    LOG.info("Sending request : (" + url + ")");
+    LOG.info("Sending request : ({})", url);
     Response response = null;
     int timeout = 1000 * 300; // 5 min timeout
     switch (method) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
index 2502cc1..2d100e4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
@@ -34,8 +34,8 @@ import org.apache.hudi.common.util.collection.Pair;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.HashMap;
@@ -58,7 +58,7 @@ import java.util.stream.Stream;
  */
 public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSystemView {
 
-  private static final Logger LOG = LogManager.getLogger(RocksDbBasedFileSystemView.class);
+  private static final Logger LOG = LoggerFactory.getLogger(RocksDbBasedFileSystemView.class);
 
   private final FileSystemViewStorageConfig config;
 
@@ -87,7 +87,7 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste
   protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline) {
     schemaHelper.getAllColumnFamilies().stream().forEach(rocksDB::addColumnFamily);
     super.init(metaClient, visibleActiveTimeline);
-    LOG.info("Created ROCKSDB based file-system view at " + config.getRocksdbBasePath());
+    LOG.info("Created ROCKSDB based file-system view at {}", config.getRocksdbBasePath());
   }
 
   @Override
@@ -102,7 +102,7 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste
         rocksDB.putInBatch(batch, schemaHelper.getColFamilyForPendingCompaction(),
             schemaHelper.getKeyForPendingCompactionLookup(opPair.getValue().getFileGroupId()), opPair);
       });
-      LOG.info("Initializing pending compaction operations. Count=" + batch.count());
+      LOG.info("Initializing pending compaction operations. Count={}", batch.count());
     });
   }
 
@@ -157,8 +157,8 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste
 
   @Override
   protected void storePartitionView(String partitionPath, List<HoodieFileGroup> fileGroups) {
-    LOG.info("Resetting and adding new partition (" + partitionPath + ") to ROCKSDB based file-system view at "
-        + config.getRocksdbBasePath() + ", Total file-groups=" + fileGroups.size());
+    LOG.info("Resetting and adding new partition ({}) to ROCKSDB based file-system view at {}, Total file-groups={}",
+        partitionPath, config.getRocksdbBasePath(), fileGroups.size());
 
     String lookupKey = schemaHelper.getKeyForPartitionLookup(partitionPath);
     rocksDB.delete(schemaHelper.getColFamilyForStoredPartitions(), lookupKey);
@@ -184,8 +184,8 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste
 
     // record that partition is loaded.
     rocksDB.put(schemaHelper.getColFamilyForStoredPartitions(), lookupKey, Boolean.TRUE);
-    LOG.info("Finished adding new partition (" + partitionPath + ") to ROCKSDB based file-system view at "
-        + config.getRocksdbBasePath() + ", Total file-groups=" + fileGroups.size());
+    LOG.info("Finished adding new partition ({}) to ROCKSDB based file-system view at {}, Total file-groups={}", partitionPath,
+        config.getRocksdbBasePath(), fileGroups.size());
   }
 
   @Override
@@ -202,7 +202,7 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste
             return fs;
           } else {
             // First remove the file-slice
-            LOG.info("Removing old Slice in DB. FS=" + oldSlice);
+            LOG.info("Removing old Slice in DB. FS={}", oldSlice);
             rocksDB.deleteInBatch(batch, schemaHelper.getColFamilyForView(),
                 schemaHelper.getKeyForSliceView(fg, oldSlice));
             rocksDB.deleteInBatch(batch, schemaHelper.getColFamilyForView(),
@@ -224,11 +224,11 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste
                 deltaLogFiles.entrySet().stream().filter(e -> !logFiles.containsKey(e.getKey()))
                     .forEach(p -> newLogFiles.put(p.getKey(), p.getValue()));
                 newLogFiles.values().stream().forEach(lf -> newFileSlice.addLogFile(lf));
-                LOG.info("Adding back new File Slice after add FS=" + newFileSlice);
+                LOG.info("Adding back new File Slice after add FS={}", newFileSlice);
                 return newFileSlice;
               }
               case REMOVE: {
-                LOG.info("Removing old File Slice =" + fs);
+                LOG.info("Removing old File Slice ={}", fs);
                 FileSlice newFileSlice = new FileSlice(oldSlice.getFileGroupId(), oldSlice.getBaseInstantTime());
                 fs.getDataFile().orElseGet(() -> {
                   oldSlice.getDataFile().ifPresent(df -> newFileSlice.setDataFile(df));
@@ -239,7 +239,7 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste
                 // Add remaining log files back
                 logFiles.values().stream().forEach(lf -> newFileSlice.addLogFile(lf));
                 if (newFileSlice.getDataFile().isPresent() || (newFileSlice.getLogFiles().count() > 0)) {
-                  LOG.info("Adding back new file-slice after remove FS=" + newFileSlice);
+                  LOG.info("Adding back new file-slice after remove FS={}", newFileSlice);
                   return newFileSlice;
                 }
                 return null;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
index 3ada17e..3c71fd6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
@@ -28,8 +28,8 @@ import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.collection.Pair;
 
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -42,7 +42,7 @@ import java.util.stream.Stream;
  */
 public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView {
 
-  private static final Logger LOG = LogManager.getLogger(SpillableMapBasedFileSystemView.class);
+  private static final Logger LOG = LoggerFactory.getLogger(SpillableMapBasedFileSystemView.class);
 
   private final long maxMemoryForFileGroupMap;
   private final long maxMemoryForPendingCompaction;
@@ -66,8 +66,8 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView {
   @Override
   protected Map<String, List<HoodieFileGroup>> createPartitionToFileGroups() {
     try {
-      LOG.info("Creating Partition To File groups map using external spillable Map. Max Mem=" + maxMemoryForFileGroupMap
-          + ", BaseDir=" + baseStoreDir);
+      LOG.info("Creating Partition To File groups map using external spillable Map.  Max Mem={}, BaseDir={}",
+          maxMemoryForPendingCompaction, baseStoreDir);
       new File(baseStoreDir).mkdirs();
       return (Map<String, List<HoodieFileGroup>>) (new ExternalSpillableMap<>(maxMemoryForFileGroupMap, baseStoreDir,
           new DefaultSizeEstimator(), new DefaultSizeEstimator<>()));
@@ -80,8 +80,8 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView {
   protected Map<HoodieFileGroupId, Pair<String, CompactionOperation>> createFileIdToPendingCompactionMap(
       Map<HoodieFileGroupId, Pair<String, CompactionOperation>> fgIdToPendingCompaction) {
     try {
-      LOG.info("Creating Pending Compaction map using external spillable Map. Max Mem=" + maxMemoryForPendingCompaction
-          + ", BaseDir=" + baseStoreDir);
+      LOG.info("Creating Pending Compaction map using external spillable Map. Max Mem={}, BaseDir={}",
+          maxMemoryForPendingCompaction, baseStoreDir);
       new File(baseStoreDir).mkdirs();
       Map<HoodieFileGroupId, Pair<String, CompactionOperation>> pendingMap = new ExternalSpillableMap<>(
           maxMemoryForPendingCompaction, baseStoreDir, new DefaultSizeEstimator(), new DefaultSizeEstimator<>());
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
index 7d5f786..a8b35d4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
@@ -32,9 +32,6 @@ import org.apache.hudi.common.versioning.compaction.CompactionV1MigrationHandler
 import org.apache.hudi.common.versioning.compaction.CompactionV2MigrationHandler;
 import org.apache.hudi.exception.HoodieException;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
@@ -48,8 +45,6 @@ import java.util.stream.Stream;
  */
 public class CompactionUtils {
 
-  private static final Logger LOG = LogManager.getLogger(CompactionUtils.class);
-
   public static final Integer COMPACTION_METADATA_VERSION_1 = CompactionV1MigrationHandler.VERSION;
   public static final Integer COMPACTION_METADATA_VERSION_2 = CompactionV2MigrationHandler.VERSION;
   public static final Integer LATEST_COMPACTION_METADATA_VERSION = COMPACTION_METADATA_VERSION_2;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/DFSPropertiesConfiguration.java b/hudi-common/src/main/java/org/apache/hudi/common/util/DFSPropertiesConfiguration.java
index f535cac..a6ee606 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/DFSPropertiesConfiguration.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/DFSPropertiesConfiguration.java
@@ -20,8 +20,8 @@ package org.apache.hudi.common.util;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -41,7 +41,7 @@ import java.util.Set;
  */
 public class DFSPropertiesConfiguration {
 
-  private static final Logger LOG = LogManager.getLogger(DFSPropertiesConfiguration.class);
+  private static final Logger LOG = LoggerFactory.getLogger(DFSPropertiesConfiguration.class);
 
   private final FileSystem fs;
 
@@ -87,7 +87,7 @@ public class DFSPropertiesConfiguration {
       BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(file)));
       addProperties(reader);
     } catch (IOException ioe) {
-      LOG.error("Error reading in properies from dfs", ioe);
+      LOG.error("Error reading in properties from dfs", ioe);
       throw new IllegalArgumentException("Cannot read properties from dfs", ioe);
     }
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java
index d9161e5..2860983 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java
@@ -39,8 +39,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -60,7 +60,7 @@ import java.util.stream.Stream;
  */
 public class FSUtils {
 
-  private static final Logger LOG = LogManager.getLogger(FSUtils.class);
+  private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
   // Log files are of this pattern - .b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1
   private static final Pattern LOG_FILE_PATTERN =
       Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)(_(([0-9]*)-([0-9]*)-([0-9]*)))?");
@@ -84,7 +84,7 @@ public class FSUtils {
     // look for all properties, prefixed to be picked up
     for (Entry<String, String> prop : System.getenv().entrySet()) {
       if (prop.getKey().startsWith(HOODIE_ENV_PROPS_PREFIX)) {
-        LOG.info("Picking up value for hoodie env var :" + prop.getKey());
+        LOG.info("Picking up value for hoodie env var :{}", prop.getKey());
         conf.set(prop.getKey().replace(HOODIE_ENV_PROPS_PREFIX, "").replaceAll("_DOT_", "."), prop.getValue());
       }
     }
@@ -461,11 +461,11 @@ public class FSUtils {
    */
   public static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p)
       throws IOException, InterruptedException {
-    LOG.info("Recover lease on dfs file " + p);
+    LOG.info("Recover lease on dfs file {}", p);
     // initiate the recovery
     boolean recovered = false;
     for (int nbAttempt = 0; nbAttempt < MAX_ATTEMPTS_RECOVER_LEASE; nbAttempt++) {
-      LOG.info("Attempt " + nbAttempt + " to recover lease on dfs file " + p);
+      LOG.info("Attempt {} to recover lease on dfs file {}", nbAttempt, p);
       recovered = dfs.recoverLease(p);
       if (recovered) {
         break;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FailSafeConsistencyGuard.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FailSafeConsistencyGuard.java
index b4a0991..248e8e5 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/FailSafeConsistencyGuard.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FailSafeConsistencyGuard.java
@@ -22,8 +22,8 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -39,7 +39,7 @@ import java.util.stream.Collectors;
  */
 public class FailSafeConsistencyGuard implements ConsistencyGuard {
 
-  private static final Logger LOG = LogManager.getLogger(FailSafeConsistencyGuard.class);
+  private static final Logger LOG = LoggerFactory.getLogger(FailSafeConsistencyGuard.class);
 
   private final FileSystem fs;
   private final ConsistencyGuardConfig consistencyGuardConfig;
@@ -86,7 +86,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
 
     retryTillSuccess((retryNum) -> {
       try {
-        LOG.info("Trying " + retryNum);
+        LOG.info("Trying {}", retryNum);
         FileStatus[] entries = fs.listStatus(dir);
         List<String> gotFiles = Arrays.stream(entries).map(e -> Path.getPathWithoutSchemeAndAuthority(e.getPath()))
             .map(p -> p.toString()).collect(Collectors.toList());
@@ -95,7 +95,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
 
         switch (event) {
           case DISAPPEAR:
-            LOG.info("Following files are visible" + candidateFiles);
+            LOG.info("Following files are visible {}", candidateFiles);
             // If no candidate files gets removed, it means all of them have disappeared
             return !altered;
           case APPEAR:
@@ -104,7 +104,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
             return candidateFiles.isEmpty();
         }
       } catch (IOException ioe) {
-        LOG.warn("Got IOException waiting for file event. Have tried " + retryNum + " time(s)", ioe);
+        LOG.warn("Got IOException waiting for file event. Have tried {} time(s)", retryNum, ioe);
       }
       return false;
     }, "Timed out waiting for files to become visible");
@@ -176,7 +176,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
   private void retryTillSuccess(Function<Integer, Boolean> predicate, String timedOutMessage) throws TimeoutException {
     long waitMs = consistencyGuardConfig.getInitialConsistencyCheckIntervalMs();
     int attempt = 0;
-    LOG.info("Max Attempts=" + consistencyGuardConfig.getMaxConsistencyChecks());
+    LOG.info("Max Attempts={}", consistencyGuardConfig.getMaxConsistencyChecks());
     while (attempt < consistencyGuardConfig.getMaxConsistencyChecks()) {
       boolean success = predicate.apply(attempt);
       if (success) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordSizeEstimator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordSizeEstimator.java
index 387b8d0..269ce49 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordSizeEstimator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordSizeEstimator.java
@@ -23,8 +23,8 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 
 import org.apache.avro.Schema;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Size Estimator for Hoodie record payload.
@@ -33,7 +33,7 @@ import org.apache.log4j.Logger;
  */
 public class HoodieRecordSizeEstimator<T extends HoodieRecordPayload> implements SizeEstimator<HoodieRecord<T>> {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieRecordSizeEstimator.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieRecordSizeEstimator.class);
 
   // Schema used to get GenericRecord from HoodieRecordPayload then convert to bytes and vice-versa
   private final Schema schema;
@@ -50,7 +50,7 @@ public class HoodieRecordSizeEstimator<T extends HoodieRecordPayload> implements
     /** {@link ExternalSpillableMap} **/
     long sizeOfRecord = ObjectSizeCalculator.getObjectSize(hoodieRecord);
     long sizeOfSchema = ObjectSizeCalculator.getObjectSize(schema);
-    LOG.info("SizeOfRecord => " + sizeOfRecord + " SizeOfSchema => " + sizeOfSchema);
+    LOG.info("SizeOfRecord => {} SizeOfSchema => {}", sizeOfRecord, sizeOfSchema);
     return sizeOfRecord;
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBDAO.java b/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBDAO.java
index 59af74b..4fcb16d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBDAO.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBDAO.java
@@ -24,8 +24,8 @@ import org.apache.hudi.exception.HoodieIOException;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
@@ -55,7 +55,7 @@ import java.util.stream.Stream;
  */
 public class RocksDBDAO {
 
-  private static final Logger LOG = LogManager.getLogger(RocksDBDAO.class);
+  private static final Logger LOG = LoggerFactory.getLogger(RocksDBDAO.class);
 
   private transient ConcurrentHashMap<String, ColumnFamilyHandle> managedHandlesMap;
   private transient ConcurrentHashMap<String, ColumnFamilyDescriptor> managedDescriptorMap;
@@ -86,7 +86,7 @@ public class RocksDBDAO {
    */
   private void init() throws HoodieException {
     try {
-      LOG.info("DELETING RocksDB persisted at " + rocksDBBasePath);
+      LOG.info("DELETING RocksDB persisted at {}", rocksDBBasePath);
       FileIOUtils.deleteDirectory(new File(rocksDBBasePath));
 
       managedHandlesMap = new ConcurrentHashMap<>();
@@ -99,7 +99,7 @@ public class RocksDBDAO {
       dbOptions.setLogger(new org.rocksdb.Logger(dbOptions) {
         @Override
         protected void log(InfoLogLevel infoLogLevel, String logMsg) {
-          LOG.info("From Rocks DB : " + logMsg);
+          LOG.info("From Rocks DB : {}", logMsg);
         }
       });
       final List<ColumnFamilyDescriptor> managedColumnFamilies = loadManagedColumnFamilies(dbOptions);
@@ -138,7 +138,7 @@ public class RocksDBDAO {
       LOG.info("No column family found. Loading default");
       managedColumnFamilies.add(getColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
     } else {
-      LOG.info("Loading column families :" + existing.stream().map(String::new).collect(Collectors.toList()));
+      LOG.info("Loading column families :{}", existing.stream().map(String::new).collect(Collectors.toList()));
       managedColumnFamilies
           .addAll(existing.stream().map(RocksDBDAO::getColumnFamilyDescriptor).collect(Collectors.toList()));
     }
@@ -352,8 +352,8 @@ public class RocksDBDAO {
       }
     }
 
-    LOG.info("Prefix Search for (query=" + prefix + ") on " + columnFamilyName + ". Total Time Taken (msec)="
-        + timer.endTimer() + ". Serialization Time taken(micro)=" + timeTakenMicro + ", num entries=" + results.size());
+    LOG.info("Prefix Search for (query={}) on {}. Total Time Taken (msec)={}. Serialization Time taken(micro)={},"
+        + " num entries={}", prefix, columnFamilyName, timer.endTimer(), timeTakenMicro, results.size());
     return results.stream();
   }
 
@@ -366,7 +366,7 @@ public class RocksDBDAO {
    */
   public <T extends Serializable> void prefixDelete(String columnFamilyName, String prefix) {
     Preconditions.checkArgument(!closed);
-    LOG.info("Prefix DELETE (query=" + prefix + ") on " + columnFamilyName);
+    LOG.info("Prefix DELETE (query={}) on {}", prefix, columnFamilyName);
     final RocksIterator it = getRocksDB().newIterator(managedHandlesMap.get(columnFamilyName));
     it.seek(prefix.getBytes());
     // Find first and last keys to be deleted
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/TimelineDiffHelper.java b/hudi-common/src/main/java/org/apache/hudi/common/util/TimelineDiffHelper.java
index 7625bb5..fb49243 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/TimelineDiffHelper.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/TimelineDiffHelper.java
@@ -23,8 +23,8 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.util.collection.Pair;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -36,7 +36,7 @@ import java.util.stream.Collectors;
  */
 public class TimelineDiffHelper {
 
-  private static final Logger LOG = LogManager.getLogger(TimelineDiffHelper.class);
+  private static final Logger LOG = LoggerFactory.getLogger(TimelineDiffHelper.class);
 
   public static TimelineDiffResult getNewInstantsForIncrementalSync(HoodieTimeline oldTimeline,
       HoodieTimeline newTimeline) {
@@ -64,8 +64,8 @@ public class TimelineDiffHelper {
       if (!lostPendingCompactions.isEmpty()) {
         // If a compaction is unscheduled, fall back to complete refresh of fs view since some log files could have been
         // moved. Its unsafe to incrementally sync in that case.
-        LOG.warn("Some pending compactions are no longer in new timeline (unscheduled ?). They are :"
-            + lostPendingCompactions);
+        LOG.warn("Some pending compactions are no longer in new timeline (unscheduled ?). They are :{}",
+            lostPendingCompactions);
         return TimelineDiffResult.UNSAFE_SYNC_RESULT;
       }
       List<HoodieInstant> finishedCompactionInstants = compactionInstants.stream()
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java
index e764a17..47c0f6a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java
@@ -25,8 +25,8 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -54,7 +54,7 @@ import java.util.stream.Stream;
  */
 public final class DiskBasedMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Iterable<R> {
 
-  private static final Logger LOG = LogManager.getLogger(DiskBasedMap.class);
+  private static final Logger LOG = LoggerFactory.getLogger(DiskBasedMap.class);
   // Stores the key and corresponding value's latest metadata spilled to disk
   private final Map<T, ValueMetadata> valueMetadataMap;
   // Write only file
@@ -111,9 +111,8 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
       writeOnlyFile.getParentFile().mkdir();
     }
     writeOnlyFile.createNewFile();
-    LOG.info("Spilling to file location " + writeOnlyFile.getAbsolutePath() + " in host ("
-        + InetAddress.getLocalHost().getHostAddress() + ") with hostname (" + InetAddress.getLocalHost().getHostName()
-        + ")");
+    LOG.info("Spilling to file location {} in host ({}) with hostname ({})", writeOnlyFile.getAbsolutePath(),
+        InetAddress.getLocalHost().getHostAddress(), InetAddress.getLocalHost().getHostName());
     // Make sure file is deleted when JVM exits
     writeOnlyFile.deleteOnExit();
     addShutDownHook();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
index 32c41f7..b68cd9a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
@@ -22,8 +22,8 @@ import org.apache.hudi.common.util.ObjectSizeCalculator;
 import org.apache.hudi.common.util.SizeEstimator;
 import org.apache.hudi.exception.HoodieIOException;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -56,7 +56,7 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
 
   // Find the actual estimated payload size after inserting N records
   private static final int NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE = 100;
-  private static final Logger LOG = LogManager.getLogger(ExternalSpillableMap.class);
+  private static final Logger LOG = LoggerFactory.getLogger(ExternalSpillableMap.class);
   // maximum space allowed in-memory for this map
   private final long maxInMemorySizeInBytes;
   // Map to store key-values in memory until it hits maxInMemorySizeInBytes
@@ -177,7 +177,7 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
         // At first, use the sizeEstimate of a record being inserted into the spillable map.
         // Note, the converter may over estimate the size of a record in the JVM
         this.estimatedPayloadSize = keySizeEstimator.sizeEstimate(key) + valueSizeEstimator.sizeEstimate(value);
-        LOG.info("Estimated Payload size => " + estimatedPayloadSize);
+        LOG.info("Estimated Payload size => {}", estimatedPayloadSize);
       } else if (shouldEstimatePayloadSize && inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0) {
         // Re-estimate the size of a record by calculating the size of the entire map containing
         // N entries and then dividing by the number of entries present (N). This helps to get a
@@ -186,7 +186,7 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
         this.currentInMemoryMapSize = totalMapSize;
         this.estimatedPayloadSize = totalMapSize / inMemoryMap.size();
         shouldEstimatePayloadSize = false;
-        LOG.info("New Estimated Payload size => " + this.estimatedPayloadSize);
+        LOG.info("New Estimated Payload size => {}", this.estimatedPayloadSize);
       }
       if (!inMemoryMap.containsKey(key)) {
         // TODO : Add support for adjusting payloadSize for updates to the same key
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
index cec9ab6..68fd3cf 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
@@ -23,8 +23,8 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SizeEstimator;
 import org.apache.hudi.exception.HoodieException;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.List;
@@ -44,7 +44,7 @@ import java.util.stream.Collectors;
  */
 public class BoundedInMemoryExecutor<I, O, E> {
 
-  private static final Logger LOG = LogManager.getLogger(BoundedInMemoryExecutor.class);
+  private static final Logger LOG = LoggerFactory.getLogger(BoundedInMemoryExecutor.class);
 
   // Executor service used for launching writer thread.
   private final ExecutorService executorService;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java
index 2c5ce5d..319d552 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java
@@ -25,8 +25,8 @@ import org.apache.hudi.exception.HoodieException;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -58,7 +58,7 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
   public static final int RECORD_SAMPLING_RATE = 64;
   // maximum records that will be cached
   private static final int RECORD_CACHING_LIMIT = 128 * 1024;
-  private static final Logger LOG = LogManager.getLogger(BoundedInMemoryQueue.class);
+  private static final Logger LOG = LoggerFactory.getLogger(BoundedInMemoryQueue.class);
   // It indicates number of records to cache. We will be using sampled record's average size to
   // determine how many
   // records we should cache and will change (increase/decrease) permits accordingly.
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java
index 5496837..c70db12 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java
@@ -18,8 +18,8 @@
 
 package org.apache.hudi.common.util.queue;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.function.Function;
 
@@ -30,7 +30,7 @@ import java.util.function.Function;
  */
 public class FunctionBasedQueueProducer<I> implements BoundedInMemoryQueueProducer<I> {
 
-  private static final Logger LOG = LogManager.getLogger(FunctionBasedQueueProducer.class);
+  private static final Logger LOG = LoggerFactory.getLogger(FunctionBasedQueueProducer.class);
 
   private final Function<BoundedInMemoryQueue<I, ?>, Boolean> producerFunction;
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java
index 3d11f38..c4719a8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java
@@ -18,8 +18,8 @@
 
 package org.apache.hudi.common.util.queue;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
 
@@ -30,7 +30,7 @@ import java.util.Iterator;
  */
 public class IteratorBasedQueueProducer<I> implements BoundedInMemoryQueueProducer<I> {
 
-  private static final Logger LOG = LogManager.getLogger(IteratorBasedQueueProducer.class);
+  private static final Logger LOG = LoggerFactory.getLogger(IteratorBasedQueueProducer.class);
 
   // input iterator for producing items in the buffer.
   private final Iterator<I> inputIterator;
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java b/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java
index 9bc9a8d..02ee92a 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java
@@ -27,8 +27,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -38,7 +38,7 @@ import java.io.IOException;
  */
 public class HdfsTestService {
 
-  private static final Logger LOG = LogManager.getLogger(HdfsTestService.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HdfsTestService.class);
 
   /**
    * Configuration settings.
@@ -72,7 +72,7 @@ public class HdfsTestService {
     // If clean, then remove the work dir so we can start fresh.
     String localDFSLocation = getDFSLocation(workDir);
     if (format) {
-      LOG.info("Cleaning HDFS cluster data at: " + localDFSLocation + " and starting fresh.");
+      LOG.info("Cleaning HDFS cluster data at: {} and starting fresh.", localDFSLocation);
       File file = new File(localDFSLocation);
       FileIOUtils.deleteDirectory(file);
     }
@@ -115,7 +115,7 @@ public class HdfsTestService {
   private static Configuration configureDFSCluster(Configuration config, String localDFSLocation, String bindIP,
       int namenodeRpcPort, int datanodePort, int datanodeIpcPort, int datanodeHttpPort) {
 
-    LOG.info("HDFS force binding to ip: " + bindIP);
+    LOG.info("HDFS force binding to ip: {}", bindIP);
     config.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + bindIP + ":" + namenodeRpcPort);
     config.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, bindIP + ":" + datanodePort);
     config.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, bindIP + ":" + datanodeIpcPort);
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/minicluster/ZookeeperTestService.java b/hudi-common/src/test/java/org/apache/hudi/common/minicluster/ZookeeperTestService.java
index 670be44..c2dadd3 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/minicluster/ZookeeperTestService.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/minicluster/ZookeeperTestService.java
@@ -22,8 +22,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.io.Files;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnLog;
@@ -53,7 +53,7 @@ import java.net.Socket;
  */
 public class ZookeeperTestService {
 
-  private static final Logger LOG = LogManager.getLogger(ZookeeperTestService.class);
+  private static final Logger LOG = LoggerFactory.getLogger(ZookeeperTestService.class);
 
   private static final int TICK_TIME = 2000;
   private static final int CONNECTION_TIMEOUT = 30000;
@@ -103,7 +103,7 @@ public class ZookeeperTestService {
 
     // NOTE: Changed from the original, where InetSocketAddress was
     // originally created to bind to the wildcard IP, we now configure it.
-    LOG.info("Zookeeper force binding to: " + this.bindIP);
+    LOG.info("Zookeeper force binding to: {}", this.bindIP);
     standaloneServerFactory.configure(new InetSocketAddress(bindIP, clientPort), 1000);
 
     // Start up this ZK server
@@ -120,7 +120,7 @@ public class ZookeeperTestService {
     }
 
     started = true;
-    LOG.info("Zookeeper Minicluster service started on client port: " + clientPort);
+    LOG.info("Zookeeper Minicluster service started on client port: {}", clientPort);
     return zooKeeperServer;
   }
 
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
index 8828402..6dc9d8a 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
@@ -43,8 +43,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -71,7 +71,7 @@ import static org.junit.Assert.assertTrue;
 @SuppressWarnings("ResultOfMethodCallIgnored")
 public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
 
-  private static final Logger LOG = LogManager.getLogger(TestHoodieTableFileSystemView.class);
+  private static final Logger LOG = LoggerFactory.getLogger(TestHoodieTableFileSystemView.class);
 
   private static String TEST_WRITE_TOKEN = "1-0-1";
 
@@ -489,7 +489,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
     roView.getAllDataFiles(partitionPath);
 
     fileSliceList = rtView.getLatestFileSlices(partitionPath).collect(Collectors.toList());
-    LOG.info("FILESLICE LIST=" + fileSliceList);
+    LOG.info("FILESLICE LIST={}", fileSliceList);
     dataFiles = fileSliceList.stream().map(FileSlice::getDataFile).filter(Option::isPresent).map(Option::get)
         .collect(Collectors.toList());
     assertEquals("Expect only one data-files in latest view as there is only one file-group", 1, dataFiles.size());