You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by pw...@apache.org on 2021/08/05 07:32:19 UTC

[hudi] branch redo-log created (now 8f8cd42)

This is an automated email from the ASF dual-hosted git repository.

pwason pushed a change to branch redo-log
in repository https://gitbox.apache.org/repos/asf/hudi.git.


      at 8f8cd42  [HUDI-458] Redo hudi-hadoop-mr log statements using SLF4J (#1210)

This branch includes the following new commits:

     new ac105e6  [HUDI-459] Redo hudi-hive log statements using SLF4J (#1203)
     new a0dc09e  [HUDI-457]Redo hudi-common log statements using SLF4J (#1161)
     new 8f8cd42  [HUDI-458] Redo hudi-hadoop-mr log statements using SLF4J (#1210)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[hudi] 03/03: [HUDI-458] Redo hudi-hadoop-mr log statements using SLF4J (#1210)

Posted by pw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch redo-log
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 8f8cd42e9792e0a9648b68f933929ad87cd503c5
Author: Mathieu <49...@users.noreply.github.com>
AuthorDate: Mon Jan 13 11:18:09 2020 +0800

    [HUDI-458] Redo hudi-hadoop-mr log statements using SLF4J (#1210)
---
 hudi-hadoop-mr/pom.xml                             |  7 ++++++
 .../org/apache/hudi/hadoop/HoodieHiveUtil.java     | 12 +++++-----
 .../hudi/hadoop/HoodieParquetInputFormat.java      | 26 ++++++++++----------
 .../hudi/hadoop/HoodieROTablePathFilter.java       | 28 +++++++++++-----------
 .../hudi/hadoop/RecordReaderValueIterator.java     |  6 ++---
 .../hadoop/hive/HoodieCombineHiveInputFormat.java  | 27 ++++++++++-----------
 .../realtime/AbstractRealtimeRecordReader.java     | 24 +++++++++----------
 .../realtime/HoodieParquetRealtimeInputFormat.java | 22 ++++++++---------
 .../realtime/HoodieRealtimeRecordReader.java       |  6 ++---
 .../realtime/RealtimeCompactedRecordReader.java    | 14 +++++------
 10 files changed, 89 insertions(+), 83 deletions(-)

diff --git a/hudi-hadoop-mr/pom.xml b/hudi-hadoop-mr/pom.xml
index 6bc3c8e..2a222f3 100644
--- a/hudi-hadoop-mr/pom.xml
+++ b/hudi-hadoop-mr/pom.xml
@@ -81,6 +81,13 @@
       <artifactId>hive-exec</artifactId>
     </dependency>
 
+    <!-- Logging -->
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j.version}</version>
+    </dependency>
+
     <!-- Hoodie - Test -->
     <dependency>
       <groupId>org.apache.hudi</groupId>
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java
index 1db8c54..f371719 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java
@@ -20,12 +20,12 @@ package org.apache.hudi.hadoop;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HoodieHiveUtil {
 
-  public static final Logger LOG = LogManager.getLogger(HoodieHiveUtil.class);
+  public static final Logger LOG = LoggerFactory.getLogger(HoodieHiveUtil.class);
 
   public static final String HOODIE_CONSUME_MODE_PATTERN = "hoodie.%s.consume.mode";
   public static final String HOODIE_START_COMMIT_PATTERN = "hoodie.%s.consume.start.timestamp";
@@ -43,20 +43,20 @@ public class HoodieHiveUtil {
     if (maxCommits == MAX_COMMIT_ALL) {
       maxCommits = Integer.MAX_VALUE;
     }
-    LOG.info("Read max commits - " + maxCommits);
+    LOG.info("Read max commits - {}", maxCommits);
     return maxCommits;
   }
 
   public static String readStartCommitTime(JobContext job, String tableName) {
     String startCommitTimestampName = String.format(HOODIE_START_COMMIT_PATTERN, tableName);
-    LOG.info("Read start commit time - " + job.getConfiguration().get(startCommitTimestampName));
+    LOG.info("Read start commit time - {}", job.getConfiguration().get(startCommitTimestampName));
     return job.getConfiguration().get(startCommitTimestampName);
   }
 
   public static String readMode(JobContext job, String tableName) {
     String modePropertyName = String.format(HOODIE_CONSUME_MODE_PATTERN, tableName);
     String mode = job.getConfiguration().get(modePropertyName, DEFAULT_SCAN_MODE);
-    LOG.info(modePropertyName + ": " + mode);
+    LOG.info("Hoodie consume mode pattern is : {}, mode is : {}", modePropertyName, mode);
     return mode;
   }
 
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
index e8f7de0..ea92f11 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
@@ -42,8 +42,8 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -60,7 +60,7 @@ import java.util.stream.Collectors;
 @UseFileSplitsFromInputFormat
 public class HoodieParquetInputFormat extends MapredParquetInputFormat implements Configurable {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieParquetInputFormat.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieParquetInputFormat.class);
 
   protected Configuration conf;
 
@@ -69,7 +69,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
     // Get all the file status from FileInputFormat and then do the filter
     FileStatus[] fileStatuses = super.listStatus(job);
     Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus = groupFileStatus(fileStatuses);
-    LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
+    LOG.info("Found a total of {} groups", groupedFileStatus.size());
     List<FileStatus> returns = new ArrayList<>();
     for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry : groupedFileStatus.entrySet()) {
       HoodieTableMetaClient metadata = entry.getKey();
@@ -81,7 +81,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
 
       FileStatus[] statuses = entry.getValue().toArray(new FileStatus[entry.getValue().size()]);
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata);
+        LOG.debug("Hoodie Metadata initialized with completed commit Ts as : {}", metadata);
       }
       String tableName = metadata.getTableConfig().getTableName();
       String mode = HoodieHiveUtil.readMode(Job.getInstance(job), tableName);
@@ -95,24 +95,24 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
         String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName);
         // Total number of commits to return in this batch. Set this to -1 to get all the commits.
         Integer maxCommits = HoodieHiveUtil.readMaxCommits(Job.getInstance(job), tableName);
-        LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
+        LOG.info("Last Incremental timestamp was set as {}", lastIncrementalTs);
         List<String> commitsToReturn = timeline.findInstantsAfter(lastIncrementalTs, maxCommits).getInstants()
             .map(HoodieInstant::getTimestamp).collect(Collectors.toList());
         List<HoodieDataFile> filteredFiles =
             roView.getLatestDataFilesInRange(commitsToReturn).collect(Collectors.toList());
         for (HoodieDataFile filteredFile : filteredFiles) {
-          LOG.info("Processing incremental hoodie file - " + filteredFile.getPath());
+          LOG.info("Processing incremental hoodie file - {}", filteredFile.getPath());
           filteredFile = checkFileStatus(filteredFile);
           returns.add(filteredFile.getFileStatus());
         }
-        LOG.info("Total paths to process after hoodie incremental filter " + filteredFiles.size());
+        LOG.info("Total paths to process after hoodie incremental filter {}", filteredFiles.size());
       } else {
         // filter files on the latest commit found
         List<HoodieDataFile> filteredFiles = roView.getLatestDataFiles().collect(Collectors.toList());
-        LOG.info("Total paths to process after hoodie filter " + filteredFiles.size());
+        LOG.info("Total paths to process after hoodie filter {}", filteredFiles.size());
         for (HoodieDataFile filteredFile : filteredFiles) {
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
+            LOG.debug("Processing latest hoodie file - {}", filteredFile.getPath());
           }
           filteredFile = checkFileStatus(filteredFile);
           returns.add(filteredFile.getFileStatus());
@@ -133,7 +133,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
     try {
       if (dataFile.getFileSize() == 0) {
         FileSystem fs = dataPath.getFileSystem(conf);
-        LOG.info("Refreshing file status " + dataFile.getPath());
+        LOG.info("Refreshing file status {}", dataFile.getPath());
         return new HoodieDataFile(fs.getFileStatus(dataPath));
       }
       return dataFile;
@@ -160,7 +160,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
           metadata = getTableMetaClient(status.getPath().getFileSystem(conf), status.getPath().getParent());
           nonHoodieBasePath = null;
         } catch (DatasetNotFoundException | InvalidDatasetException e) {
-          LOG.info("Handling a non-hoodie path " + status.getPath());
+          LOG.info("Handling a non-hoodie path {}", status.getPath());
           metadata = null;
           nonHoodieBasePath = status.getPath().getParent().toString();
         }
@@ -213,7 +213,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
       levels = metadata.getPartitionDepth();
     }
     Path baseDir = HoodieHiveUtil.getNthParent(dataPath, levels);
-    LOG.info("Reading hoodie metadata from path " + baseDir.toString());
+    LOG.info("Reading hoodie metadata from path {}", baseDir);
     return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
   }
 }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
index fae8111..4670e4f 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.HashMap;
@@ -50,7 +50,7 @@ import java.util.stream.Collectors;
  */
 public class HoodieROTablePathFilter implements PathFilter, Serializable {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieROTablePathFilter.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieROTablePathFilter.class);
 
   /**
    * Its quite common, to have all files from a given partition path be passed into accept(), cache the check for hoodie
@@ -88,7 +88,7 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
   public boolean accept(Path path) {
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Checking acceptance for path " + path);
+      LOG.debug("Checking acceptance for path {}", path);
     }
     Path folder = null;
     try {
@@ -101,15 +101,15 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
       // Try to use the caches.
       if (nonHoodiePathCache.contains(folder.toString())) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Accepting non-hoodie path from cache: " + path);
+          LOG.debug("Accepting non-hoodie path from cache: {}", path);
         }
         return true;
       }
 
       if (hoodiePathCache.containsKey(folder.toString())) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("%s Hoodie path checked against cache, accept => %s \n", path,
-              hoodiePathCache.get(folder.toString()).contains(path)));
+          LOG.debug("{} Hoodie path checked against cache, accept => {}", path,
+              hoodiePathCache.get(folder.toString()).contains(path));
         }
         return hoodiePathCache.get(folder.toString()).contains(path);
       }
@@ -119,7 +119,7 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
       if (filePath.contains("/" + HoodieTableMetaClient.METAFOLDER_NAME + "/")
           || filePath.endsWith("/" + HoodieTableMetaClient.METAFOLDER_NAME)) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("Skipping Hoodie Metadata file  %s \n", filePath));
+          LOG.debug("Skipping Hoodie Metadata file {}", filePath);
         }
         return false;
       }
@@ -144,22 +144,22 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
           if (!hoodiePathCache.containsKey(folder.toString())) {
             hoodiePathCache.put(folder.toString(), new HashSet<>());
           }
-          LOG.info("Based on hoodie metadata from base path: " + baseDir.toString() + ", caching " + latestFiles.size()
-              + " files under " + folder);
+          LOG.info("Based on hoodie metadata from base path: {}, caching {} files under {}", baseDir,
+              latestFiles.size(), folder);
           for (HoodieDataFile lfile : latestFiles) {
             hoodiePathCache.get(folder.toString()).add(new Path(lfile.getPath()));
           }
 
           // accept the path, if its among the latest files.
           if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format("%s checked after cache population, accept => %s \n", path,
-                hoodiePathCache.get(folder.toString()).contains(path)));
+            LOG.debug("{} checked after cache population, accept => {}", path,
+                hoodiePathCache.get(folder.toString()).contains(path));
           }
           return hoodiePathCache.get(folder.toString()).contains(path);
         } catch (DatasetNotFoundException e) {
           // Non-hoodie path, accept it.
           if (LOG.isDebugEnabled()) {
-            LOG.debug(String.format("(1) Caching non-hoodie path under %s \n", folder.toString()));
+            LOG.debug("(1) Caching non-hoodie path under {}", folder);
           }
           nonHoodiePathCache.add(folder.toString());
           return true;
@@ -167,7 +167,7 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
       } else {
         // files is at < 3 level depth in FS tree, can't be hoodie dataset
         if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("(2) Caching non-hoodie path under %s \n", folder.toString()));
+          LOG.debug("(2) Caching non-hoodie path under {}", folder);
         }
         nonHoodiePathCache.add(folder.toString());
         return true;
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java
index 0386186..7ffa3bf 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java
@@ -21,8 +21,8 @@ package org.apache.hudi.hadoop;
 import org.apache.hudi.exception.HoodieException;
 
 import org.apache.hadoop.mapred.RecordReader;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -36,7 +36,7 @@ import java.util.NoSuchElementException;
  */
 public class RecordReaderValueIterator<K, V> implements Iterator<V> {
 
-  private static final Logger LOG = LogManager.getLogger(RecordReaderValueIterator.class);
+  private static final Logger LOG = LoggerFactory.getLogger(RecordReaderValueIterator.class);
 
   private final RecordReader<K, V> reader;
   private V nextVal = null;
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
index cd1cea3..4ed7ba9 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
@@ -57,8 +57,8 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
 import org.apache.hadoop.mapred.lib.CombineFileSplit;
 import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -93,7 +93,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
     extends HiveInputFormat<K, V> {
 
   private static final String CLASS_NAME = HoodieCombineHiveInputFormat.class.getName();
-  public static final Logger LOG = LogManager.getLogger(CLASS_NAME);
+  public static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
 
   // max number of threads we can use to check non-combinable paths
   private static final int MAX_CHECK_NONCOMBINABLE_THREAD_NUM = 50;
@@ -125,7 +125,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
         if (inputFormat instanceof AvoidSplitCombination
             && ((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf)) {
           if (LOG.isDebugEnabled()) {
-            LOG.debug("The path [" + paths[i + start] + "] is being parked for HiveInputFormat.getSplits");
+            LOG.debug("The path [{}] is being parked for HiveInputFormat.getSplits", paths[i + start]);
           }
           nonCombinablePathIndices.add(i + start);
         }
@@ -388,7 +388,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
       Class inputFormatClass = part.getInputFileFormatClass();
       String inputFormatClassName = inputFormatClass.getName();
       InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
-      LOG.info("Input Format => " + inputFormatClass.getName());
+      LOG.info("Input Format => {}", inputFormatClass.getName());
       // **MOD** Set the hoodie filter in the combine
       if (inputFormatClass.getName().equals(HoodieParquetInputFormat.class.getName())) {
         combine.setHoodieFilter(true);
@@ -428,11 +428,11 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
         f = poolMap.get(combinePathInputFormat);
         if (f == null) {
           f = new CombineFilter(filterPath);
-          LOG.info("CombineHiveInputSplit creating pool for " + path + "; using filter path " + filterPath);
+          LOG.info("CombineHiveInputSplit creating pool for {}; using filter path {}", path, filterPath);
           combine.createPool(job, f);
           poolMap.put(combinePathInputFormat, f);
         } else {
-          LOG.info("CombineHiveInputSplit: pool is already created for " + path + "; using filter path " + filterPath);
+          LOG.info("CombineHiveInputSplit: pool is already created for {}; using filter path {}", path, filterPath);
           f.addPath(filterPath);
         }
       } else {
@@ -481,7 +481,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
       result.add(csplit);
     }
 
-    LOG.info("number of splits " + result.size());
+    LOG.info("number of splits {}", result.size());
     return result.toArray(new CombineHiveInputSplit[result.size()]);
   }
 
@@ -491,8 +491,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
   @VisibleForTesting
   public Set<Integer> getNonCombinablePathIndices(JobConf job, Path[] paths, int numThreads)
       throws ExecutionException, InterruptedException {
-    LOG.info("Total number of paths: " + paths.length + ", launching " + numThreads
-        + " threads to check non-combinable ones.");
+    LOG.info("Total number of paths: {}, launching {} threads to check non-combinable ones.", paths.length, numThreads);
     int numPathPerThread = (int) Math.ceil((double) paths.length / numThreads);
 
     ExecutorService executor = Executors.newFixedThreadPool(numThreads);
@@ -555,8 +554,8 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
     // Store the previous value for the path specification
     String oldPaths = job.get(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("The received input paths are: [" + oldPaths + "] against the property "
-          + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR);
+      LOG.debug("The received input paths are: [{}] against the property {}", oldPaths,
+          org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR);
     }
 
     // Process the normal splits
@@ -589,7 +588,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
     // clear work from ThreadLocal after splits generated in case of thread is reused in pool.
     Utilities.clearWorkMapForConf(job);
 
-    LOG.info("Number of all splits " + result.size());
+    LOG.info("Number of all splits {}", result.size());
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
     return result.toArray(new InputSplit[result.size()]);
   }
@@ -691,7 +690,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
         retLists.add(split);
         long splitgLength = split.getLength();
         if (size + splitgLength >= targetSize) {
-          LOG.info("Sample alias " + entry.getValue() + " using " + (i + 1) + "splits");
+          LOG.info("Sample alias {} using {} splits", entry.getValue(), i + 1);
           if (size + splitgLength > targetSize) {
             ((InputSplitShim) split).shrinkSplit(targetSize - size);
           }
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
index d7b50d4..6c7bab1 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -45,11 +45,11 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -84,7 +84,7 @@ public abstract class AbstractRealtimeRecordReader {
   // Default file path prefix for spillable file
   public static final String DEFAULT_SPILLABLE_MAP_BASE_PATH = "/tmp/";
 
-  private static final Logger LOG = LogManager.getLogger(AbstractRealtimeRecordReader.class);
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractRealtimeRecordReader.class);
 
   protected final HoodieRealtimeFileSplit split;
   protected final JobConf jobConf;
@@ -98,12 +98,12 @@ public abstract class AbstractRealtimeRecordReader {
   public AbstractRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job) {
     this.split = split;
     this.jobConf = job;
-    LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
-    LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
-    LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""));
+    LOG.info("cfg ==> {}", job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
+    LOG.info("columnIds ==> {}", job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
+    LOG.info("partitioningColumns ==> {}", job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""));
     try {
       this.usesCustomPayload = usesCustomPayload();
-      LOG.info("usesCustomPayload ==> " + this.usesCustomPayload);
+      LOG.info("usesCustomPayload ==> {}", this.usesCustomPayload);
       baseFileSchema = readSchema(jobConf, split.getPath());
       init();
     } catch (IOException e) {
@@ -339,10 +339,10 @@ public abstract class AbstractRealtimeRecordReader {
         LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaFilePaths(), jobConf);
     if (schemaFromLogFile == null) {
       writerSchema = new AvroSchemaConverter().convert(baseFileSchema);
-      LOG.debug("Writer Schema From Parquet => " + writerSchema.getFields());
+      LOG.debug("Writer Schema From Parquet => {}", writerSchema.getFields());
     } else {
       writerSchema = schemaFromLogFile;
-      LOG.debug("Writer Schema From Log => " + writerSchema.getFields());
+      LOG.debug("Writer Schema From Log => {}", writerSchema.getFields());
     }
     // Add partitioning fields to writer schema for resulting row to contain null values for these fields
     String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
@@ -359,8 +359,8 @@ public abstract class AbstractRealtimeRecordReader {
     // to null out fields not present before
 
     readerSchema = generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields);
-    LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s",
-        split.getDeltaFilePaths(), split.getPath(), projectionFields));
+    LOG.info("About to read compacted logs {} for base split {}, projecting cols {}",
+        split.getDeltaFilePaths(), split.getPath(), projectionFields);
   }
 
   private Schema constructHiveOrderedSchema(Schema writerSchema, Map<String, Field> schemaFieldsMap) {
@@ -377,7 +377,7 @@ public abstract class AbstractRealtimeRecordReader {
       } else {
         // Hive has some extra virtual columns like BLOCK__OFFSET__INSIDE__FILE which do not exist in table schema.
         // They will get skipped as they won't be found in the original schema.
-        LOG.debug("Skipping Hive Column => " + columnName);
+        LOG.debug("Skipping Hive Column => {}", columnName);
       }
     }
 
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
index f62f288..0320257 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
@@ -46,8 +46,8 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -65,7 +65,7 @@ import java.util.stream.Stream;
 @UseFileSplitsFromInputFormat
 public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat implements Configurable {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieParquetRealtimeInputFormat.class);
 
   // These positions have to be deterministic across all tables
   public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
@@ -148,7 +148,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
         throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e);
       }
     });
-    LOG.info("Returning a total splits of " + rtSplits.size());
+    LOG.info("Returning a total splits of {}", rtSplits.size());
     return rtSplits.toArray(new InputSplit[rtSplits.size()]);
   }
 
@@ -180,9 +180,9 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
       conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, readColNamesPrefix + fieldName);
       conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, readColIdsPrefix + fieldIndex);
       if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("Adding extra column " + fieldName + ", to enable log merging cols (%s) ids (%s) ",
+        LOG.debug("Adding extra column {}, to enable log merging cols ({}) ids ({}) ", fieldName,
             conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
-            conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)));
+            conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
       }
     }
     return conf;
@@ -210,7 +210,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
     if (!columnIds.isEmpty() && columnIds.charAt(0) == ',') {
       conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, columnIds.substring(1));
       if (LOG.isDebugEnabled()) {
-        LOG.debug("The projection Ids: {" + columnIds + "} start with ','. First comma is removed");
+        LOG.debug("The projection Ids: [{}] start with ','. First comma is removed", columnIds);
       }
     }
     return conf;
@@ -226,9 +226,9 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
     // actual heavy lifting of reading the parquet files happen.
     if (job.get(HOODIE_READ_COLUMNS_PROP) == null) {
       synchronized (job) {
-        LOG.info(
-            "Before adding Hoodie columns, Projections :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
-                + ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
+        LOG.info("Before adding Hoodie columns, Projections : {}, Ids : {}",
+            job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
+                job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
         if (job.get(HOODIE_READ_COLUMNS_PROP) == null) {
           // Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table;
           // In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases
@@ -244,7 +244,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
       }
     }
 
-    LOG.info("Creating record reader with readCols :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+    LOG.info("Creating record reader with readCols : {}", job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
         + ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
     // sanity check
     Preconditions.checkArgument(split instanceof HoodieRealtimeFileSplit,
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java
index cb8606e..bd02971 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java
@@ -24,8 +24,8 @@ import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -39,7 +39,7 @@ public class HoodieRealtimeRecordReader implements RecordReader<NullWritable, Ar
   public static final String REALTIME_SKIP_MERGE_PROP = "hoodie.realtime.merge.skip";
   // By default, we do merged-reading
   public static final String DEFAULT_REALTIME_SKIP_MERGE = "false";
-  private static final Logger LOG = LogManager.getLogger(HoodieRealtimeRecordReader.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieRealtimeRecordReader.class);
   private final RecordReader<NullWritable, ArrayWritable> reader;
 
   public HoodieRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job,
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index deeaaf4..5ad59af 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -31,8 +31,8 @@ import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Map;
@@ -40,7 +40,7 @@ import java.util.Map;
 class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
     implements RecordReader<NullWritable, ArrayWritable> {
 
-  private static final Logger LOG = LogManager.getLogger(AbstractRealtimeRecordReader.class);
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractRealtimeRecordReader.class);
 
   protected final RecordReader<NullWritable, ArrayWritable> parquetReader;
   private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> deltaRecordMap;
@@ -108,8 +108,8 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
         ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(recordToReturn, getHiveSchema());
         Writable[] replaceValue = aWritable.get();
         if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("key %s, base values: %s, log values: %s", key, arrayWritableToString(arrayWritable),
-              arrayWritableToString(aWritable)));
+          LOG.debug("key {}, base values: {}, log values: {}", key, arrayWritableToString(arrayWritable),
+              arrayWritableToString(aWritable));
         }
         Writable[] originalValue = arrayWritable.get();
         try {
@@ -117,8 +117,8 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
           arrayWritable.set(originalValue);
         } catch (RuntimeException re) {
           LOG.error("Got exception when doing array copy", re);
-          LOG.error("Base record :" + arrayWritableToString(arrayWritable));
-          LOG.error("Log record :" + arrayWritableToString(aWritable));
+          LOG.error("Base record : {}", arrayWritableToString(arrayWritable));
+          LOG.error("Log record : {}", arrayWritableToString(aWritable));
           String errMsg = "Base-record :" + arrayWritableToString(arrayWritable)
               + " ,Log-record :" + arrayWritableToString(aWritable) + " ,Error :" + re.getMessage();
           throw new RuntimeException(errMsg, re);

[hudi] 02/03: [HUDI-457]Redo hudi-common log statements using SLF4J (#1161)

Posted by pw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch redo-log
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit a0dc09ec06603a17ccf3962d2172c4bb87c1f3b4
Author: Li Jiaq <se...@gmail.com>
AuthorDate: Fri Jan 10 13:06:42 2020 +0800

    [HUDI-457]Redo hudi-common log statements using SLF4J (#1161)
---
 hudi-common/pom.xml                                |  6 ++++
 .../hudi/common/model/HoodieCommitMetadata.java    |  8 ++---
 .../hudi/common/model/HoodiePartitionMetadata.java | 11 +++----
 .../common/model/HoodieRollingStatMetadata.java    |  8 ++---
 .../hudi/common/table/HoodieTableConfig.java       |  8 ++---
 .../hudi/common/table/HoodieTableMetaClient.java   | 16 ++++-----
 .../table/log/AbstractHoodieLogRecordScanner.java  | 29 ++++++++---------
 .../hudi/common/table/log/HoodieLogFileReader.java | 12 +++----
 .../hudi/common/table/log/HoodieLogFormat.java     | 13 ++++----
 .../common/table/log/HoodieLogFormatReader.java    |  8 ++---
 .../common/table/log/HoodieLogFormatWriter.java    | 23 +++++++------
 .../table/log/HoodieMergedLogRecordScanner.java    | 16 ++++-----
 .../table/timeline/HoodieActiveTimeline.java       | 30 ++++++++---------
 .../table/timeline/HoodieArchivedTimeline.java     |  4 ---
 .../table/timeline/HoodieDefaultTimeline.java      |  4 ---
 .../table/view/AbstractTableFileSystemView.java    | 19 +++++------
 .../common/table/view/FileSystemViewManager.java   | 15 ++++-----
 .../table/view/HoodieTableFileSystemView.java      |  8 ++---
 .../IncrementalTimelineSyncFileSystemView.java     | 38 +++++++++++-----------
 .../table/view/PriorityBasedFileSystemView.java    |  6 ++--
 .../view/RemoteHoodieTableFileSystemView.java      |  8 ++---
 .../table/view/RocksDbBasedFileSystemView.java     | 26 +++++++--------
 .../view/SpillableMapBasedFileSystemView.java      | 14 ++++----
 .../apache/hudi/common/util/CompactionUtils.java   |  5 ---
 .../common/util/DFSPropertiesConfiguration.java    |  8 ++---
 .../java/org/apache/hudi/common/util/FSUtils.java  | 12 +++----
 .../hudi/common/util/FailSafeConsistencyGuard.java | 14 ++++----
 .../common/util/HoodieRecordSizeEstimator.java     |  8 ++---
 .../org/apache/hudi/common/util/RocksDBDAO.java    | 18 +++++-----
 .../hudi/common/util/TimelineDiffHelper.java       | 10 +++---
 .../hudi/common/util/collection/DiskBasedMap.java  | 11 +++----
 .../util/collection/ExternalSpillableMap.java      | 10 +++---
 .../common/util/queue/BoundedInMemoryExecutor.java |  6 ++--
 .../common/util/queue/BoundedInMemoryQueue.java    |  6 ++--
 .../util/queue/FunctionBasedQueueProducer.java     |  6 ++--
 .../util/queue/IteratorBasedQueueProducer.java     |  6 ++--
 .../hudi/common/minicluster/HdfsTestService.java   | 10 +++---
 .../common/minicluster/ZookeeperTestService.java   | 10 +++---
 .../table/view/TestHoodieTableFileSystemView.java  |  8 ++---
 39 files changed, 232 insertions(+), 246 deletions(-)

diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml
index c9aaf7a..f153119 100644
--- a/hudi-common/pom.xml
+++ b/hudi-common/pom.xml
@@ -166,6 +166,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>com.github.stefanbirkner</groupId>
       <artifactId>system-rules</artifactId>
       <version>1.16.0</version>
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index 475f75c..9a69545 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -25,8 +25,8 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.PropertyAccessor;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -43,7 +43,7 @@ import java.util.Map;
 public class HoodieCommitMetadata implements Serializable {
 
   public static final String SCHEMA_KEY = "schema";
-  private static final Logger LOG = LogManager.getLogger(HoodieCommitMetadata.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieCommitMetadata.class);
   protected Map<String, List<HoodieWriteStat>> partitionToWriteStats;
   protected Boolean compacted;
 
@@ -118,7 +118,7 @@ public class HoodieCommitMetadata implements Serializable {
 
   public String toJsonString() throws IOException {
     if (partitionToWriteStats.containsKey(null)) {
-      LOG.info("partition path is null for " + partitionToWriteStats.get(null));
+      LOG.info("partition path is null for {}", partitionToWriteStats.get(null));
       partitionToWriteStats.remove(null);
     }
     return getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
index 013869c..7fd6cda 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
@@ -24,8 +24,8 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Properties;
@@ -51,7 +51,7 @@ public class HoodiePartitionMetadata {
 
   private final FileSystem fs;
 
-  private static final Logger LOG = LogManager.getLogger(HoodiePartitionMetadata.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodiePartitionMetadata.class);
 
   /**
    * Construct metadata from existing partition.
@@ -101,8 +101,7 @@ public class HoodiePartitionMetadata {
         fs.rename(tmpMetaPath, metaPath);
       }
     } catch (IOException ioe) {
-      LOG.warn("Error trying to save partition metadata (this is okay, as long as atleast 1 of these succced), "
-          + partitionPath, ioe);
+      LOG.warn("Error trying to save partition metadata (this is okay, as long as atleast 1 of these succced), {}", partitionPath, ioe);
     } finally {
       if (!metafileExists) {
         try {
@@ -111,7 +110,7 @@ public class HoodiePartitionMetadata {
             fs.delete(tmpMetaPath, false);
           }
         } catch (IOException ioe) {
-          LOG.warn("Error trying to clean up temporary files for " + partitionPath, ioe);
+          LOG.warn("Error trying to clean up temporary files for {}", partitionPath, ioe);
         }
       }
     }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRollingStatMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRollingStatMetadata.java
index bd1ef94..0459eb5 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRollingStatMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRollingStatMetadata.java
@@ -18,8 +18,8 @@
 
 package org.apache.hudi.common.model;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -31,7 +31,7 @@ import java.util.Map;
  */
 public class HoodieRollingStatMetadata implements Serializable {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieRollingStatMetadata.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieRollingStatMetadata.class);
   protected Map<String, Map<String, HoodieRollingStat>> partitionToRollingStats;
   private String actionType = "DUMMY_ACTION";
   public static final String ROLLING_STAT_METADATA_KEY = "ROLLING_STAT";
@@ -78,7 +78,7 @@ public class HoodieRollingStatMetadata implements Serializable {
 
   public String toJsonString() throws IOException {
     if (partitionToRollingStats.containsKey(null)) {
-      LOG.info("partition path is null for " + partitionToRollingStats.get(null));
+      LOG.info("partition path is null for {}", partitionToRollingStats.get(null));
       partitionToRollingStats.remove(null);
     }
     return HoodieCommitMetadata.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index cc950f7..e08a3b8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -28,8 +28,8 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -48,7 +48,7 @@ import java.util.stream.Collectors;
  */
 public class HoodieTableConfig implements Serializable {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieTableConfig.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieTableConfig.class);
 
   public static final String HOODIE_PROPERTIES_FILE = "hoodie.properties";
   public static final String HOODIE_TABLE_NAME_PROP_NAME = "hoodie.table.name";
@@ -71,7 +71,7 @@ public class HoodieTableConfig implements Serializable {
   public HoodieTableConfig(FileSystem fs, String metaPath) {
     Properties props = new Properties();
     Path propertyPath = new Path(metaPath, HOODIE_PROPERTIES_FILE);
-    LOG.info("Loading dataset properties from " + propertyPath);
+    LOG.info("Loading dataset properties from {}", propertyPath);
     try {
       try (FSDataInputStream inputStream = fs.open(propertyPath)) {
         props.load(inputStream);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 489b204..6f5feae 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -39,8 +39,8 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -66,7 +66,7 @@ import java.util.stream.Stream;
  */
 public class HoodieTableMetaClient implements Serializable {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieTableMetaClient.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieTableMetaClient.class);
   public static String METAFOLDER_NAME = ".hoodie";
   public static String TEMPFOLDER_NAME = METAFOLDER_NAME + File.separator + ".temp";
   public static String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + File.separator + ".aux";
@@ -97,7 +97,7 @@ public class HoodieTableMetaClient implements Serializable {
   public HoodieTableMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad,
       ConsistencyGuardConfig consistencyGuardConfig, Option<TimelineLayoutVersion> layoutVersion)
       throws DatasetNotFoundException {
-    LOG.info("Loading HoodieTableMetaClient from " + basePath);
+    LOG.info("Loading HoodieTableMetaClient from {}", basePath);
     this.basePath = basePath;
     this.consistencyGuardConfig = consistencyGuardConfig;
     this.hadoopConf = new SerializableConfiguration(conf);
@@ -110,9 +110,9 @@ public class HoodieTableMetaClient implements Serializable {
     this.tableType = tableConfig.getTableType();
     this.timelineLayoutVersion = layoutVersion.orElse(tableConfig.getTimelineLayoutVersion());
     this.loadActiveTimelineOnLoad = loadActiveTimelineOnLoad;
-    LOG.info("Finished Loading Table of type " + tableType + "(version=" + timelineLayoutVersion + ") from " + basePath);
+    LOG.info("Finished Loading Table of type {}(version={}) from {}", tableType, timelineLayoutVersion, basePath);
     if (loadActiveTimelineOnLoad) {
-      LOG.info("Loading Active commit timeline for " + basePath);
+      LOG.info("Loading Active commit timeline for {}", basePath);
       getActiveTimeline();
     }
   }
@@ -324,7 +324,7 @@ public class HoodieTableMetaClient implements Serializable {
    */
   public static HoodieTableMetaClient initDatasetAndGetMetaClient(Configuration hadoopConf, String basePath,
       Properties props) throws IOException {
-    LOG.info("Initializing " + basePath + " as hoodie dataset " + basePath);
+    LOG.info("Initializing {} as hoodie dataset", basePath);
     Path basePathDir = new Path(basePath);
     final FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
     if (!fs.exists(basePathDir)) {
@@ -361,7 +361,7 @@ public class HoodieTableMetaClient implements Serializable {
     // We should not use fs.getConf as this might be different from the original configuration
     // used to create the fs in unit tests
     HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, basePath);
-    LOG.info("Finished initializing Table of type " + metaClient.getTableConfig().getTableType() + " from " + basePath);
+    LOG.info("Finished initializing Table of type {} from {}", metaClient.getTableConfig().getTableType(), basePath);
     return metaClient;
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
index 42f1f9a..2e783f3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
@@ -36,8 +36,8 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -66,7 +66,7 @@ import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlo
  */
 public abstract class AbstractHoodieLogRecordScanner {
 
-  private static final Logger LOG = LogManager.getLogger(AbstractHoodieLogRecordScanner.class);
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractHoodieLogRecordScanner.class);
 
   // Reader schema for the records
   protected final Schema readerSchema;
@@ -131,7 +131,7 @@ public abstract class AbstractHoodieLogRecordScanner {
       Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
       while (logFormatReaderWrapper.hasNext()) {
         HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
-        LOG.info("Scanning log file " + logFile);
+        LOG.info("Scanning log file {}", logFile);
         scannedLogFiles.add(logFile);
         totalLogFiles.set(scannedLogFiles.size());
         // Use the HoodieLogFileReader to iterate through the blocks in the log file
@@ -145,7 +145,7 @@ public abstract class AbstractHoodieLogRecordScanner {
         }
         switch (r.getBlockType()) {
           case AVRO_DATA_BLOCK:
-            LOG.info("Reading a data block from file " + logFile.getPath());
+            LOG.info("Reading a data block from file {}", logFile.getPath());
             if (isNewInstantBlock(r) && !readBlocksLazily) {
               // If this is an avro data block belonging to a different commit/instant,
               // then merge the last blocks and records into the main result
@@ -155,7 +155,7 @@ public abstract class AbstractHoodieLogRecordScanner {
             currentInstantLogBlocks.push(r);
             break;
           case DELETE_BLOCK:
-            LOG.info("Reading a delete block from file " + logFile.getPath());
+            LOG.info("Reading a delete block from file {}", logFile.getPath());
             if (isNewInstantBlock(r) && !readBlocksLazily) {
               // If this is a delete data block belonging to a different commit/instant,
               // then merge the last blocks and records into the main result
@@ -177,7 +177,7 @@ public abstract class AbstractHoodieLogRecordScanner {
             // written per ingestion batch for a file but in reality we need to rollback (B1 & B2)
             // The following code ensures the same rollback block (R1) is used to rollback
             // both B1 & B2
-            LOG.info("Reading a command block from file " + logFile.getPath());
+            LOG.info("Reading a command block from file {}", logFile.getPath());
             // This is a command block - take appropriate action based on the command
             HoodieCommandBlock commandBlock = (HoodieCommandBlock) r;
             String targetInstantForCommandBlock =
@@ -196,34 +196,33 @@ public abstract class AbstractHoodieLogRecordScanner {
                   HoodieLogBlock lastBlock = currentInstantLogBlocks.peek();
                   // handle corrupt blocks separately since they may not have metadata
                   if (lastBlock.getBlockType() == CORRUPT_BLOCK) {
-                    LOG.info("Rolling back the last corrupted log block read in " + logFile.getPath());
+                    LOG.info("Rolling back the last corrupted log block read in {}", logFile.getPath());
                     currentInstantLogBlocks.pop();
                     numBlocksRolledBack++;
                   } else if (lastBlock.getBlockType() != CORRUPT_BLOCK
                       && targetInstantForCommandBlock.contentEquals(lastBlock.getLogBlockHeader().get(INSTANT_TIME))) {
                     // rollback last data block or delete block
-                    LOG.info("Rolling back the last log block read in " + logFile.getPath());
+                    LOG.info("Rolling back the last log block read in {}", logFile.getPath());
                     currentInstantLogBlocks.pop();
                     numBlocksRolledBack++;
                   } else if (!targetInstantForCommandBlock
                       .contentEquals(currentInstantLogBlocks.peek().getLogBlockHeader().get(INSTANT_TIME))) {
                     // invalid or extra rollback block
-                    LOG.warn("TargetInstantTime " + targetInstantForCommandBlock
-                        + " invalid or extra rollback command block in " + logFile.getPath());
+                    LOG.warn("TargetInstantTime {} invalid or extra rollback command block in {}", targetInstantForCommandBlock, logFile.getPath());
                     break;
                   } else {
                     // this should not happen ideally
-                    LOG.warn("Unable to apply rollback command block in " + logFile.getPath());
+                    LOG.warn("Unable to apply rollback command block in {}", logFile.getPath());
                   }
                 }
-                LOG.info("Number of applied rollback blocks " + numBlocksRolledBack);
+                LOG.info("Number of applied rollback blocks {}", numBlocksRolledBack);
                 break;
               default:
                 throw new UnsupportedOperationException("Command type not yet supported.");
             }
             break;
           case CORRUPT_BLOCK:
-            LOG.info("Found a corrupt block in " + logFile.getPath());
+            LOG.info("Found a corrupt block in {}", logFile.getPath());
             totalCorruptBlocks.incrementAndGet();
             // If there is a corrupt block - we will assume that this was the next data block
             currentInstantLogBlocks.push(r);
@@ -297,7 +296,7 @@ public abstract class AbstractHoodieLogRecordScanner {
    */
   private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> lastBlocks, int numLogFilesSeen) throws Exception {
     while (!lastBlocks.isEmpty()) {
-      LOG.info("Number of remaining logblocks to merge " + lastBlocks.size());
+      LOG.info("Number of remaining logblocks to merge {}", lastBlocks.size());
       // poll the element at the bottom of the stack since that's the order it was inserted
       HoodieLogBlock lastBlock = lastBlocks.pollLast();
       switch (lastBlock.getBlockType()) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index 354f809..994941a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -38,8 +38,8 @@ import org.apache.hadoop.fs.BufferedFSInputStream;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -54,7 +54,7 @@ import java.util.Map;
 class HoodieLogFileReader implements HoodieLogFormat.Reader {
 
   public static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024; // 16 MB
-  private static final Logger LOG = LogManager.getLogger(HoodieLogFileReader.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieLogFileReader.class);
 
   private final FSDataInputStream inputStream;
   private final HoodieLogFile logFile;
@@ -113,7 +113,7 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
         try {
           close();
         } catch (Exception e) {
-          LOG.warn("unable to close input stream for log file " + logFile, e);
+          LOG.warn("unable to close input stream for log file {}", logFile, e);
           // fail silently for any sort of exception
         }
       }
@@ -211,12 +211,12 @@ class HoodieLogFileReader implements HoodieLogFormat.Reader {
   }
 
   private HoodieLogBlock createCorruptBlock() throws IOException {
-    LOG.info("Log " + logFile + " has a corrupted block at " + inputStream.getPos());
+    LOG.info("Log {} has a corrupted block at {}", logFile, inputStream.getPos());
     long currentPos = inputStream.getPos();
     long nextBlockOffset = scanForNextAvailableBlockOffset();
     // Rewind to the initial start and read corrupted bytes till the nextBlockOffset
     inputStream.seek(currentPos);
-    LOG.info("Next available block in " + logFile + " starts at " + nextBlockOffset);
+    LOG.info("Next available block in {} starts at {}", logFile, nextBlockOffset);
     int corruptedBlockSize = (int) (nextBlockOffset - currentPos);
     long contentPosition = inputStream.getPos();
     byte[] corruptedBytes = HoodieLogBlock.readOrSkipContent(inputStream, corruptedBlockSize, readBlockLazily);
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
index a7165fb..69228ca 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
@@ -27,8 +27,8 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -104,7 +104,7 @@ public interface HoodieLogFormat {
    */
   class WriterBuilder {
 
-    private static final Logger LOG = LogManager.getLogger(WriterBuilder.class);
+    private static final Logger LOG = LoggerFactory.getLogger(WriterBuilder.class);
     // Default max log file size 512 MB
     public static final long DEFAULT_SIZE_THRESHOLD = 512 * 1024 * 1024L;
 
@@ -210,7 +210,7 @@ public interface HoodieLogFormat {
       }
 
       if (logVersion == null) {
-        LOG.info("Computing the next log version for " + logFileId + " in " + parentPath);
+        LOG.info("Computing the next log version for {} in {}", logFileId, parentPath);
         Option<Pair<Integer, String>> versionAndWriteToken =
             FSUtils.getLatestLogVersion(fs, parentPath, logFileId, fileExtension, commitTime);
         if (versionAndWriteToken.isPresent()) {
@@ -222,8 +222,7 @@ public interface HoodieLogFormat {
           // Use rollover write token as write token to create new log file with tokens
           logWriteToken = rolloverLogWriteToken;
         }
-        LOG.info("Computed the next log version for " + logFileId + " in " + parentPath + " as " + logVersion
-            + " with write-token " + logWriteToken);
+        LOG.info("Computed the next log version for {} in {} as {} with write-token {}", logFileId, parentPath, logVersion, logWriteToken);
       }
 
       if (logWriteToken == null) {
@@ -234,7 +233,7 @@ public interface HoodieLogFormat {
 
       Path logPath = new Path(parentPath,
           FSUtils.makeLogFileName(logFileId, fileExtension, commitTime, logVersion, logWriteToken));
-      LOG.info("HoodieLogFile on path " + logPath);
+      LOG.info("HoodieLogFile on path {}", logPath);
       HoodieLogFile logFile = new HoodieLogFile(logPath);
 
       if (bufferSize == null) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
index a5834d2..b7bb7ae 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
@@ -24,8 +24,8 @@ import org.apache.hudi.exception.HoodieIOException;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -46,7 +46,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
   private final boolean reverseLogReader;
   private int bufferSize;
 
-  private static final Logger LOG = LogManager.getLogger(HoodieLogFormatReader.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieLogFormatReader.class);
 
   HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, Schema readerSchema, boolean readBlocksLazily,
       boolean reverseLogReader, int bufferSize) throws IOException {
@@ -103,7 +103,7 @@ public class HoodieLogFormatReader implements HoodieLogFormat.Reader {
       } catch (IOException io) {
         throw new HoodieIOException("unable to initialize read with log file ", io);
       }
-      LOG.info("Moving to the next reader for logfile " + currentReader.getLogFile());
+      LOG.info("Moving to the next reader for logfile {}", currentReader.getLogFile());
       return this.currentReader.hasNext();
     }
     return false;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
index b8d5f89..25ec235 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
@@ -34,8 +34,8 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -44,7 +44,7 @@ import java.io.IOException;
  */
 public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieLogFormatWriter.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieLogFormatWriter.class);
 
   private HoodieLogFile logFile;
   private final FileSystem fs;
@@ -76,7 +76,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
     if (fs.exists(path)) {
       boolean isAppendSupported = StorageSchemes.isAppendSupported(fs.getScheme());
       if (isAppendSupported) {
-        LOG.info(logFile + " exists. Appending to existing file");
+        LOG.info("{} exists. Appending to existing file", logFile);
         try {
           this.output = fs.append(path, bufferSize);
         } catch (RemoteException e) {
@@ -93,11 +93,11 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
       }
       if (!isAppendSupported) {
         this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
-        LOG.info("Append not supported.. Rolling over to " + logFile);
+        LOG.info("Append not supported.. Rolling over to {}", logFile);
         createNewFile();
       }
     } else {
-      LOG.info(logFile + " does not exist. Create a new file");
+      LOG.info("{} does not exist. Create a new file", logFile);
       // Block size does not matter as we will always manually autoflush
       createNewFile();
     }
@@ -180,8 +180,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
     if (getCurrentSize() > sizeThreshold) {
       // TODO - make an end marker which seals the old log file (no more appends possible to that
       // file).
-      LOG.info("CurrentSize " + getCurrentSize() + " has reached threshold " + sizeThreshold
-          + ". Rolling over to the next version");
+      LOG.info("CurrentSize {} has reached threshold {}. Rolling over to the next version", getCurrentSize(), sizeThreshold);
       HoodieLogFile newLogFile = logFile.rollOver(fs, rolloverLogWriteToken);
       // close this writer and return the new writer
       close();
@@ -237,7 +236,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
       this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
       createNewFile();
     } else if (e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName())) {
-      LOG.warn("Another task executor writing to the same log file(" + logFile + ". Rolling over");
+      LOG.warn("Another task executor writing to the same log file({}. Rolling over", logFile);
       // Rollover the current log file (since cannot get a stream handle) and create new one
       this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
       createNewFile();
@@ -246,13 +245,13 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
       // this happens when either another task executor writing to this file died or
       // data node is going down. Note that we can only try to recover lease for a DistributedFileSystem.
       // ViewFileSystem unfortunately does not support this operation
-      LOG.warn("Trying to recover log on path " + path);
+      LOG.warn("Trying to recover log on path {}", path);
       if (FSUtils.recoverDFSFileLease((DistributedFileSystem) fs, path)) {
-        LOG.warn("Recovered lease on path " + path);
+        LOG.warn("Recovered lease on path {}", path);
         // try again
         this.output = fs.append(path, bufferSize);
       } else {
-        LOG.warn("Failed to recover lease on path " + path);
+        LOG.warn("Failed to recover lease on path {}", path);
         throw new HoodieException(e);
       }
     } else {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index e6246c4..228af19 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -30,8 +30,8 @@ import org.apache.hudi.exception.HoodieIOException;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Iterator;
@@ -54,7 +54,7 @@ import java.util.Map;
 public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
     implements Iterable<HoodieRecord<? extends HoodieRecordPayload>> {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieMergedLogRecordScanner.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieMergedLogRecordScanner.class);
 
   // Final map of compacted/merged records
   private final ExternalSpillableMap<String, HoodieRecord<? extends HoodieRecordPayload>> records;
@@ -81,12 +81,12 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
       scan();
       this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer();
       this.numMergedRecordsInLog = records.size();
-      LOG.info("MaxMemoryInBytes allowed for compaction => " + maxMemorySizeInBytes);
-      LOG.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => " + records.getInMemoryMapNumEntries());
+      LOG.info("MaxMemoryInBytes allowed for compaction => {}", maxMemorySizeInBytes);
+      LOG.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => {}", records.getInMemoryMapNumEntries());
       LOG.info(
-          "Total size in bytes of MemoryBasedMap in ExternalSpillableMap => " + records.getCurrentInMemoryMapSize());
-      LOG.info("Number of entries in DiskBasedMap in ExternalSpillableMap => " + records.getDiskBasedMapNumEntries());
-      LOG.info("Size of file spilled to disk => " + records.getSizeOfFileOnDiskInBytes());
+          "Total size in bytes of MemoryBasedMap in ExternalSpillableMap => {}", records.getCurrentInMemoryMapSize());
+      LOG.info("Number of entries in DiskBasedMap in ExternalSpillableMap => {}", records.getDiskBasedMapNumEntries());
+      LOG.info("Size of file spilled to disk => {}", records.getSizeOfFileOnDiskInBytes());
     } catch (IOException e) {
       throw new HoodieIOException("IOException when reading log file ");
     }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index cdf3764..ff50330 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -31,8 +31,8 @@ import com.google.common.collect.Sets;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -68,7 +68,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
           INFLIGHT_SAVEPOINT_EXTENSION, CLEAN_EXTENSION, REQUESTED_CLEAN_EXTENSION, INFLIGHT_CLEAN_EXTENSION,
           INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION}));
 
-  private static final Logger LOG = LogManager.getLogger(HoodieActiveTimeline.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieActiveTimeline.class);
   protected HoodieTableMetaClient metaClient;
   private static AtomicReference<String> lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE));
 
@@ -103,7 +103,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
     // multiple casts will make this lambda serializable -
     // http://docs.oracle.com/javase/specs/jls/se8/html/jls-15.html#jls-15.16
     this.details = (Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails;
-    LOG.info("Loaded instants " + getInstants().collect(Collectors.toList()));
+    LOG.info("Loaded instants {}", getInstants().collect(Collectors.toList()));
   }
 
   public HoodieActiveTimeline(HoodieTableMetaClient metaClient) {
@@ -221,24 +221,24 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
   }
 
   public void createNewInstant(HoodieInstant instant) {
-    LOG.info("Creating a new instant " + instant);
+    LOG.info("Creating a new instant {}", instant);
     // Create the in-flight file
     createFileInMetaPath(instant.getFileName(), Option.empty(), false);
   }
 
   public void saveAsComplete(HoodieInstant instant, Option<byte[]> data) {
-    LOG.info("Marking instant complete " + instant);
+    LOG.info("Marking instant complete {}", instant);
     Preconditions.checkArgument(instant.isInflight(),
-        "Could not mark an already completed instant as complete again " + instant);
+            "Could not mark an already completed instant as complete again " + instant);
     transitionState(instant, HoodieTimeline.getCompletedInstant(instant), data);
-    LOG.info("Completed " + instant);
+    LOG.info("Completed {}", instant);
   }
 
   public HoodieInstant revertToInflight(HoodieInstant instant) {
-    LOG.info("Reverting instant to inflight " + instant);
+    LOG.info("Reverting instant to inflight {}", instant);
     HoodieInstant inflight = HoodieTimeline.getInflightInstant(instant, metaClient.getTableType());
     revertCompleteToInflight(instant, inflight);
-    LOG.info("Reverted " + instant + " to inflight " + inflight);
+    LOG.info("Reverted {} to inflight {}", instant, inflight);
     return inflight;
   }
 
@@ -259,12 +259,12 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
   }
 
   private void deleteInstantFile(HoodieInstant instant) {
-    LOG.info("Deleting instant " + instant);
+    LOG.info("Deleting instant {}", instant);
     Path inFlightCommitFilePath = new Path(metaClient.getMetaPath(), instant.getFileName());
     try {
       boolean result = metaClient.getFs().delete(inFlightCommitFilePath, false);
       if (result) {
-        LOG.info("Removed instant " + instant);
+        LOG.info("Removed instant {}", instant);
       } else {
         throw new HoodieIOException("Could not delete instant " + instant);
       }
@@ -405,12 +405,12 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
         }
       } else {
         // Ensures old state exists in timeline
-        LOG.info("Checking for file exists ?" + new Path(metaClient.getMetaPath(), fromInstant.getFileName()));
+        LOG.info("Checking for file exists ?{}", new Path(metaClient.getMetaPath(), fromInstant.getFileName()));
         Preconditions.checkArgument(metaClient.getFs().exists(new Path(metaClient.getMetaPath(),
             fromInstant.getFileName())));
         // Use Write Once to create Target File
         createImmutableFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data);
-        LOG.info("Create new file for toInstant ?" + new Path(metaClient.getMetaPath(), toInstant.getFileName()));
+        LOG.info("Create new file for toInstant ?{}", new Path(metaClient.getMetaPath(), toInstant.getFileName()));
       }
     } catch (IOException e) {
       throw new HoodieIOException("Could not complete " + fromInstant, e);
@@ -491,7 +491,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
       // If the path does not exist, create it first
       if (!metaClient.getFs().exists(fullPath)) {
         if (metaClient.getFs().createNewFile(fullPath)) {
-          LOG.info("Created a new file in meta path: " + fullPath);
+          LOG.info("Created a new file in meta path: {}", fullPath);
         } else {
           throw new HoodieIOException("Failed to create file " + fullPath);
         }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
index d26a88f..a7a10e4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
@@ -26,8 +26,6 @@ import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -54,8 +52,6 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
   private HoodieTableMetaClient metaClient;
   private Map<String, byte[]> readCommits = new HashMap<>();
 
-  private static final Logger LOG = LogManager.getLogger(HoodieArchivedTimeline.class);
-
   public HoodieArchivedTimeline(HoodieTableMetaClient metaClient) {
     // Read back the commits to make sure
     Path archiveLogPath = HoodieArchivedTimeline.getArchiveLogPath(metaClient.getArchivePath());
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
index c61355c..4030fce 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
@@ -25,8 +25,6 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieException;
 
 import com.google.common.collect.Sets;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
@@ -47,8 +45,6 @@ import static java.util.Collections.reverse;
  */
 public class HoodieDefaultTimeline implements HoodieTimeline {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieDefaultTimeline.class);
-
   private static final String HASHING_ALGORITHM = "SHA-256";
 
   protected transient Function<HoodieInstant, Option<byte[]>> details;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
index bd7462d..ea05782 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java
@@ -38,8 +38,8 @@ import org.apache.hudi.exception.HoodieIOException;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -66,7 +66,7 @@ import java.util.stream.Stream;
  */
 public abstract class AbstractTableFileSystemView implements SyncableFileSystemView, Serializable {
 
-  private static final Logger LOG = LogManager.getLogger(AbstractTableFileSystemView.class);
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractTableFileSystemView.class);
 
   protected HoodieTableMetaClient metaClient;
 
@@ -123,8 +123,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
       }
     });
     long storePartitionsTs = timer.endTimer();
-    LOG.info("addFilesToView: NumFiles=" + statuses.length + ", FileGroupsCreationTime=" + fgBuildTimeTakenMs
-        + ", StoreTimeTaken=" + storePartitionsTs);
+    LOG.info("addFilesToView: NumFiles={}, FileGroupsCreationTime={}, StoreTimeTaken={}", statuses.length, fgBuildTimeTakenMs, storePartitionsTs);
     return fileGroups;
   }
 
@@ -217,7 +216,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
       if (!isPartitionAvailableInStore(partitionPathStr)) {
         // Not loaded yet
         try {
-          LOG.info("Building file system view for partition (" + partitionPathStr + ")");
+          LOG.info("Building file system view for partition ({})", partitionPathStr);
 
           // Create the path if it does not exist already
           Path partitionPath = FSUtils.getPartitionPath(metaClient.getBasePath(), partitionPathStr);
@@ -236,10 +235,10 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
           throw new HoodieIOException("Failed to list data files in partition " + partitionPathStr, e);
         }
       } else {
-        LOG.debug("View already built for Partition :" + partitionPathStr + ", FOUND is ");
+        LOG.debug("View already built for Partition :{}, FOUND is ", partitionPathStr);
       }
       long endTs = System.currentTimeMillis();
-      LOG.info("Time to load partition (" + partitionPathStr + ") =" + (endTs - beginTs));
+      LOG.info("Time to load partition ({}) = {}", partitionPathStr, (endTs - beginTs));
       return true;
     });
   }
@@ -290,7 +289,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
   protected boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) {
     Option<Pair<String, CompactionOperation>> compactionWithInstantTime =
         getPendingCompactionOperationWithInstant(fileSlice.getFileGroupId());
-    LOG.info("Pending Compaction instant for (" + fileSlice + ") is :" + compactionWithInstantTime);
+    LOG.info("Pending Compaction instant for ({}) is :{}", fileSlice, compactionWithInstantTime);
     return (compactionWithInstantTime.isPresent())
         && fileSlice.getBaseInstantTime().equals(compactionWithInstantTime.get().getKey());
   }
@@ -303,7 +302,7 @@ public abstract class AbstractTableFileSystemView implements SyncableFileSystemV
    */
   protected FileSlice filterDataFileAfterPendingCompaction(FileSlice fileSlice) {
     if (isFileSliceAfterPendingCompaction(fileSlice)) {
-      LOG.info("File Slice (" + fileSlice + ") is in pending compaction");
+      LOG.info("File Slice ({}) is in pending compaction", fileSlice);
       // Data file is filtered out of the file-slice as the corresponding compaction
       // instant not completed yet.
       FileSlice transformed =
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
index e00b2c7..078f394 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
@@ -24,8 +24,8 @@ import org.apache.hudi.common.table.HoodieTimeline;
 import org.apache.hudi.common.table.SyncableFileSystemView;
 import org.apache.hudi.common.util.Functions.Function2;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -51,7 +51,7 @@ import java.util.concurrent.ConcurrentHashMap;
  * clients for querying.
  */
 public class FileSystemViewManager {
-  private static final Logger LOG = LogManager.getLogger(FileSystemViewManager.class);
+  private static final Logger LOG = LoggerFactory.getLogger(FileSystemViewManager.class);
 
   private final SerializableConfiguration conf;
   // The View Storage config used to store file-system views
@@ -126,7 +126,7 @@ public class FileSystemViewManager {
    */
   private static SpillableMapBasedFileSystemView createSpillableMapBasedFileSystemView(SerializableConfiguration conf,
       FileSystemViewStorageConfig viewConf, String basePath) {
-    LOG.info("Creating SpillableMap based view for basePath " + basePath);
+    LOG.info("Creating SpillableMap based view for basePath {}", basePath);
     HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf.newCopy(), basePath, true);
     HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
     return new SpillableMapBasedFileSystemView(metaClient, timeline, viewConf);
@@ -142,7 +142,7 @@ public class FileSystemViewManager {
    */
   private static HoodieTableFileSystemView createInMemoryFileSystemView(SerializableConfiguration conf,
       FileSystemViewStorageConfig viewConf, String basePath) {
-    LOG.info("Creating InMemory based view for basePath " + basePath);
+    LOG.info("Creating InMemory based view for basePath {}", basePath);
     HoodieTableMetaClient metaClient = new HoodieTableMetaClient(conf.newCopy(), basePath, true);
     HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
     return new HoodieTableFileSystemView(metaClient, timeline, viewConf.isIncrementalTimelineSyncEnabled());
@@ -158,8 +158,7 @@ public class FileSystemViewManager {
    */
   private static RemoteHoodieTableFileSystemView createRemoteFileSystemView(SerializableConfiguration conf,
       FileSystemViewStorageConfig viewConf, HoodieTableMetaClient metaClient) {
-    LOG.info("Creating remote view for basePath " + metaClient.getBasePath() + ". Server="
-        + viewConf.getRemoteViewServerHost() + ":" + viewConf.getRemoteViewServerPort());
+    LOG.info("Creating remote view for basePath {}. Server={}:{}", metaClient.getBasePath(), viewConf.getRemoteViewServerHost(), viewConf.getRemoteViewServerPort());
     return new RemoteHoodieTableFileSystemView(viewConf.getRemoteViewServerHost(), viewConf.getRemoteViewServerPort(),
         metaClient);
   }
@@ -173,7 +172,7 @@ public class FileSystemViewManager {
    */
   public static FileSystemViewManager createViewManager(final SerializableConfiguration conf,
       final FileSystemViewStorageConfig config) {
-    LOG.info("Creating View Manager with storage type :" + config.getStorageType());
+    LOG.info("Creating View Manager with storage type :{}", config.getStorageType());
     switch (config.getStorageType()) {
       case EMBEDDED_KV_STORE:
         LOG.info("Creating embedded rocks-db based Table View");
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
index dd71124..c9311ad 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTableFileSystemView.java
@@ -29,8 +29,8 @@ import org.apache.hudi.common.util.collection.Pair;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -48,7 +48,7 @@ import java.util.stream.Stream;
  */
 public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystemView {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieTableFileSystemView.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieTableFileSystemView.class);
 
   // mapping from partition paths to file groups contained within them
   protected Map<String, List<HoodieFileGroup>> partitionToFileGroupsMap;
@@ -193,7 +193,7 @@ public class HoodieTableFileSystemView extends IncrementalTimelineSyncFileSystem
 
   @Override
   protected void storePartitionView(String partitionPath, List<HoodieFileGroup> fileGroups) {
-    LOG.info("Adding file-groups for partition :" + partitionPath + ", #FileGroups=" + fileGroups.size());
+    LOG.info("Adding file-groups for partition :{}, #FileGroups={}", partitionPath, fileGroups.size());
     List<HoodieFileGroup> newList = new ArrayList<>(fileGroups);
     partitionToFileGroupsMap.put(partitionPath, newList);
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
index 19209ac..a11b75b 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/IncrementalTimelineSyncFileSystemView.java
@@ -42,8 +42,8 @@ import org.apache.hudi.exception.HoodieException;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
@@ -55,7 +55,7 @@ import java.util.stream.Collectors;
  */
 public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTableFileSystemView {
 
-  private static final Logger LOG = LogManager.getLogger(IncrementalTimelineSyncFileSystemView.class);
+  private static final Logger LOG = LoggerFactory.getLogger(IncrementalTimelineSyncFileSystemView.class);
 
   // Allows incremental Timeline syncing
   private final boolean incrementalTimelineSyncEnabled;
@@ -103,7 +103,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
    */
   private void runIncrementalSync(HoodieTimeline timeline, TimelineDiffResult diffResult) {
 
-    LOG.info("Timeline Diff Result is :" + diffResult);
+    LOG.info("Timeline Diff Result is :{}", diffResult);
 
     // First remove pending compaction instants which were completed
     diffResult.getFinishedCompactionInstants().stream().forEach(instant -> {
@@ -144,7 +144,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
    * @param instant Compaction Instant to be removed
    */
   private void removePendingCompactionInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
-    LOG.info("Removing completed compaction instant (" + instant + ")");
+    LOG.info("Removing completed compaction instant ({})", instant);
     HoodieCompactionPlan plan = CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp());
     removePendingCompactionOperations(CompactionUtils.getPendingCompactionOperations(instant, plan)
         .map(instantPair -> Pair.of(instantPair.getValue().getKey(),
@@ -158,7 +158,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
    * @param instant Compaction Instant
    */
   private void addPendingCompactionInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
-    LOG.info("Syncing pending compaction instant (" + instant + ")");
+    LOG.info("Syncing pending compaction instant ({})", instant);
     HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(metaClient, instant.getTimestamp());
     List<Pair<String, CompactionOperation>> pendingOps =
         CompactionUtils.getPendingCompactionOperations(instant, compactionPlan)
@@ -189,13 +189,13 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
    * @param instant Instant
    */
   private void addCommitInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
-    LOG.info("Syncing committed instant (" + instant + ")");
+    LOG.info("Syncing committed instant ({})", instant);
     HoodieCommitMetadata commitMetadata =
         HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
     commitMetadata.getPartitionToWriteStats().entrySet().stream().forEach(entry -> {
       String partition = entry.getKey();
       if (isPartitionAvailableInStore(partition)) {
-        LOG.info("Syncing partition (" + partition + ") of instant (" + instant + ")");
+        LOG.info("Syncing partition ({}) of instant ({})", partition, instant);
         FileStatus[] statuses = entry.getValue().stream().map(p -> {
           FileStatus status = new FileStatus(p.getFileSizeInBytes(), false, 0, 0, 0, 0, null, null, null,
               new Path(String.format("%s/%s", metaClient.getBasePath(), p.getPath())));
@@ -205,10 +205,10 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
             buildFileGroups(statuses, timeline.filterCompletedAndCompactionInstants(), false);
         applyDeltaFileSlicesToPartitionView(partition, fileGroups, DeltaApplyMode.ADD);
       } else {
-        LOG.warn("Skipping partition (" + partition + ") when syncing instant (" + instant + ") as it is not loaded");
+        LOG.warn("Skipping partition ({}) when syncing instant ({}) as it is not loaded", partition, instant);
       }
     });
-    LOG.info("Done Syncing committed instant (" + instant + ")");
+    LOG.info("Done Syncing committed instant ({})", instant);
   }
 
   /**
@@ -218,7 +218,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
    * @param instant Restore Instant
    */
   private void addRestoreInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
-    LOG.info("Syncing restore instant (" + instant + ")");
+    LOG.info("Syncing restore instant ({})", instant);
     HoodieRestoreMetadata metadata =
         AvroUtils.deserializeAvroMetadata(timeline.getInstantDetails(instant).get(), HoodieRestoreMetadata.class);
 
@@ -232,7 +232,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
       removeFileSlicesForPartition(timeline, instant, e.getKey(),
           e.getValue().stream().map(x -> x.getValue()).collect(Collectors.toList()));
     });
-    LOG.info("Done Syncing restore instant (" + instant + ")");
+    LOG.info("Done Syncing restore instant ({})", instant);
   }
 
   /**
@@ -242,14 +242,14 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
    * @param instant Rollback Instant
    */
   private void addRollbackInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
-    LOG.info("Syncing rollback instant (" + instant + ")");
+    LOG.info("Syncing rollback instant ({})", instant);
     HoodieRollbackMetadata metadata =
         AvroUtils.deserializeAvroMetadata(timeline.getInstantDetails(instant).get(), HoodieRollbackMetadata.class);
 
     metadata.getPartitionMetadata().entrySet().stream().forEach(e -> {
       removeFileSlicesForPartition(timeline, instant, e.getKey(), e.getValue().getSuccessDeleteFiles());
     });
-    LOG.info("Done Syncing rollback instant (" + instant + ")");
+    LOG.info("Done Syncing rollback instant ({})", instant);
   }
 
   /**
@@ -259,7 +259,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
    * @param instant Clean instant
    */
   private void addCleanInstant(HoodieTimeline timeline, HoodieInstant instant) throws IOException {
-    LOG.info("Syncing cleaner instant (" + instant + ")");
+    LOG.info("Syncing cleaner instant ({})", instant);
     HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(metaClient, instant);
     cleanMetadata.getPartitionMetadata().entrySet().stream().forEach(entry -> {
       final String basePath = metaClient.getBasePath();
@@ -270,13 +270,13 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
           .collect(Collectors.toList());
       removeFileSlicesForPartition(timeline, instant, entry.getKey(), fullPathList);
     });
-    LOG.info("Done Syncing cleaner instant (" + instant + ")");
+    LOG.info("Done Syncing cleaner instant ({})", instant);
   }
 
   private void removeFileSlicesForPartition(HoodieTimeline timeline, HoodieInstant instant, String partition,
       List<String> paths) {
     if (isPartitionAvailableInStore(partition)) {
-      LOG.info("Removing file slices for partition (" + partition + ") for instant (" + instant + ")");
+      LOG.info("Removing file slices for partition ({}) for instant ({})", partition, instant);
       FileStatus[] statuses = paths.stream().map(p -> {
         FileStatus status = new FileStatus();
         status.setPath(new Path(p));
@@ -286,7 +286,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
           buildFileGroups(statuses, timeline.filterCompletedAndCompactionInstants(), false);
       applyDeltaFileSlicesToPartitionView(partition, fileGroups, DeltaApplyMode.REMOVE);
     } else {
-      LOG.warn("Skipping partition (" + partition + ") when syncing instant (" + instant + ") as it is not loaded");
+      LOG.warn("Skipping partition ({}) when syncing instant ({}) as it is not loaded", partition, instant);
     }
   }
 
@@ -309,7 +309,7 @@ public abstract class IncrementalTimelineSyncFileSystemView extends AbstractTabl
   protected void applyDeltaFileSlicesToPartitionView(String partition, List<HoodieFileGroup> deltaFileGroups,
       DeltaApplyMode mode) {
     if (deltaFileGroups.isEmpty()) {
-      LOG.info("No delta file groups for partition :" + partition);
+      LOG.info("No delta file groups for partition :{}", partition);
       return;
     }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
index e9087db..41966a0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java
@@ -32,8 +32,8 @@ import org.apache.hudi.common.util.Functions.Function3;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.List;
@@ -45,7 +45,7 @@ import java.util.stream.Stream;
  */
 public class PriorityBasedFileSystemView implements SyncableFileSystemView, Serializable {
 
-  private static final Logger LOG = LogManager.getLogger(PriorityBasedFileSystemView.class);
+  private static final Logger LOG = LoggerFactory.getLogger(PriorityBasedFileSystemView.class);
 
   private final SyncableFileSystemView preferredView;
   private final SyncableFileSystemView secondaryView;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index c8e625b..7194ffa 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -43,8 +43,8 @@ import com.google.common.base.Preconditions;
 import org.apache.http.client.fluent.Request;
 import org.apache.http.client.fluent.Response;
 import org.apache.http.client.utils.URIBuilder;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -108,7 +108,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
   public static final String INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM = "includependingcompaction";
 
 
-  private static final Logger LOG = LogManager.getLogger(RemoteHoodieTableFileSystemView.class);
+  private static final Logger LOG = LoggerFactory.getLogger(RemoteHoodieTableFileSystemView.class);
 
   private final String serverHost;
   private final int serverPort;
@@ -148,7 +148,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView,
     builder.addParameter(TIMELINE_HASH, timeline.getTimelineHash());
 
     String url = builder.toString();
-    LOG.info("Sending request : (" + url + ")");
+    LOG.info("Sending request : ({})", url);
     Response response = null;
     int timeout = 1000 * 300; // 5 min timeout
     switch (method) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
index 2502cc1..2d100e4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
@@ -34,8 +34,8 @@ import org.apache.hudi.common.util.collection.Pair;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.HashMap;
@@ -58,7 +58,7 @@ import java.util.stream.Stream;
  */
 public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSystemView {
 
-  private static final Logger LOG = LogManager.getLogger(RocksDbBasedFileSystemView.class);
+  private static final Logger LOG = LoggerFactory.getLogger(RocksDbBasedFileSystemView.class);
 
   private final FileSystemViewStorageConfig config;
 
@@ -87,7 +87,7 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste
   protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline) {
     schemaHelper.getAllColumnFamilies().stream().forEach(rocksDB::addColumnFamily);
     super.init(metaClient, visibleActiveTimeline);
-    LOG.info("Created ROCKSDB based file-system view at " + config.getRocksdbBasePath());
+    LOG.info("Created ROCKSDB based file-system view at {}", config.getRocksdbBasePath());
   }
 
   @Override
@@ -102,7 +102,7 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste
         rocksDB.putInBatch(batch, schemaHelper.getColFamilyForPendingCompaction(),
             schemaHelper.getKeyForPendingCompactionLookup(opPair.getValue().getFileGroupId()), opPair);
       });
-      LOG.info("Initializing pending compaction operations. Count=" + batch.count());
+      LOG.info("Initializing pending compaction operations. Count={}", batch.count());
     });
   }
 
@@ -157,8 +157,8 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste
 
   @Override
   protected void storePartitionView(String partitionPath, List<HoodieFileGroup> fileGroups) {
-    LOG.info("Resetting and adding new partition (" + partitionPath + ") to ROCKSDB based file-system view at "
-        + config.getRocksdbBasePath() + ", Total file-groups=" + fileGroups.size());
+    LOG.info("Resetting and adding new partition ({}) to ROCKSDB based file-system view at {}, Total file-groups={}",
+        partitionPath, config.getRocksdbBasePath(), fileGroups.size());
 
     String lookupKey = schemaHelper.getKeyForPartitionLookup(partitionPath);
     rocksDB.delete(schemaHelper.getColFamilyForStoredPartitions(), lookupKey);
@@ -184,8 +184,8 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste
 
     // record that partition is loaded.
     rocksDB.put(schemaHelper.getColFamilyForStoredPartitions(), lookupKey, Boolean.TRUE);
-    LOG.info("Finished adding new partition (" + partitionPath + ") to ROCKSDB based file-system view at "
-        + config.getRocksdbBasePath() + ", Total file-groups=" + fileGroups.size());
+    LOG.info("Finished adding new partition ({}) to ROCKSDB based file-system view at {}, Total file-groups={}", partitionPath,
+        config.getRocksdbBasePath(), fileGroups.size());
   }
 
   @Override
@@ -202,7 +202,7 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste
             return fs;
           } else {
             // First remove the file-slice
-            LOG.info("Removing old Slice in DB. FS=" + oldSlice);
+            LOG.info("Removing old Slice in DB. FS={}", oldSlice);
             rocksDB.deleteInBatch(batch, schemaHelper.getColFamilyForView(),
                 schemaHelper.getKeyForSliceView(fg, oldSlice));
             rocksDB.deleteInBatch(batch, schemaHelper.getColFamilyForView(),
@@ -224,11 +224,11 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste
                 deltaLogFiles.entrySet().stream().filter(e -> !logFiles.containsKey(e.getKey()))
                     .forEach(p -> newLogFiles.put(p.getKey(), p.getValue()));
                 newLogFiles.values().stream().forEach(lf -> newFileSlice.addLogFile(lf));
-                LOG.info("Adding back new File Slice after add FS=" + newFileSlice);
+                LOG.info("Adding back new File Slice after add FS={}", newFileSlice);
                 return newFileSlice;
               }
               case REMOVE: {
-                LOG.info("Removing old File Slice =" + fs);
+                LOG.info("Removing old File Slice ={}", fs);
                 FileSlice newFileSlice = new FileSlice(oldSlice.getFileGroupId(), oldSlice.getBaseInstantTime());
                 fs.getDataFile().orElseGet(() -> {
                   oldSlice.getDataFile().ifPresent(df -> newFileSlice.setDataFile(df));
@@ -239,7 +239,7 @@ public class RocksDbBasedFileSystemView extends IncrementalTimelineSyncFileSyste
                 // Add remaining log files back
                 logFiles.values().stream().forEach(lf -> newFileSlice.addLogFile(lf));
                 if (newFileSlice.getDataFile().isPresent() || (newFileSlice.getLogFiles().count() > 0)) {
-                  LOG.info("Adding back new file-slice after remove FS=" + newFileSlice);
+                  LOG.info("Adding back new file-slice after remove FS={}", newFileSlice);
                   return newFileSlice;
                 }
                 return null;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
index 3ada17e..3c71fd6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java
@@ -28,8 +28,8 @@ import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 import org.apache.hudi.common.util.collection.Pair;
 
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -42,7 +42,7 @@ import java.util.stream.Stream;
  */
 public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView {
 
-  private static final Logger LOG = LogManager.getLogger(SpillableMapBasedFileSystemView.class);
+  private static final Logger LOG = LoggerFactory.getLogger(SpillableMapBasedFileSystemView.class);
 
   private final long maxMemoryForFileGroupMap;
   private final long maxMemoryForPendingCompaction;
@@ -66,8 +66,8 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView {
   @Override
   protected Map<String, List<HoodieFileGroup>> createPartitionToFileGroups() {
     try {
-      LOG.info("Creating Partition To File groups map using external spillable Map. Max Mem=" + maxMemoryForFileGroupMap
-          + ", BaseDir=" + baseStoreDir);
+      LOG.info("Creating Partition To File groups map using external spillable Map.  Max Mem={}, BaseDir={}",
+          maxMemoryForPendingCompaction, baseStoreDir);
       new File(baseStoreDir).mkdirs();
       return (Map<String, List<HoodieFileGroup>>) (new ExternalSpillableMap<>(maxMemoryForFileGroupMap, baseStoreDir,
           new DefaultSizeEstimator(), new DefaultSizeEstimator<>()));
@@ -80,8 +80,8 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView {
   protected Map<HoodieFileGroupId, Pair<String, CompactionOperation>> createFileIdToPendingCompactionMap(
       Map<HoodieFileGroupId, Pair<String, CompactionOperation>> fgIdToPendingCompaction) {
     try {
-      LOG.info("Creating Pending Compaction map using external spillable Map. Max Mem=" + maxMemoryForPendingCompaction
-          + ", BaseDir=" + baseStoreDir);
+      LOG.info("Creating Pending Compaction map using external spillable Map. Max Mem={}, BaseDir={}",
+          maxMemoryForPendingCompaction, baseStoreDir);
       new File(baseStoreDir).mkdirs();
       Map<HoodieFileGroupId, Pair<String, CompactionOperation>> pendingMap = new ExternalSpillableMap<>(
           maxMemoryForPendingCompaction, baseStoreDir, new DefaultSizeEstimator(), new DefaultSizeEstimator<>());
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
index 7d5f786..a8b35d4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CompactionUtils.java
@@ -32,9 +32,6 @@ import org.apache.hudi.common.versioning.compaction.CompactionV1MigrationHandler
 import org.apache.hudi.common.versioning.compaction.CompactionV2MigrationHandler;
 import org.apache.hudi.exception.HoodieException;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
@@ -48,8 +45,6 @@ import java.util.stream.Stream;
  */
 public class CompactionUtils {
 
-  private static final Logger LOG = LogManager.getLogger(CompactionUtils.class);
-
   public static final Integer COMPACTION_METADATA_VERSION_1 = CompactionV1MigrationHandler.VERSION;
   public static final Integer COMPACTION_METADATA_VERSION_2 = CompactionV2MigrationHandler.VERSION;
   public static final Integer LATEST_COMPACTION_METADATA_VERSION = COMPACTION_METADATA_VERSION_2;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/DFSPropertiesConfiguration.java b/hudi-common/src/main/java/org/apache/hudi/common/util/DFSPropertiesConfiguration.java
index f535cac..a6ee606 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/DFSPropertiesConfiguration.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/DFSPropertiesConfiguration.java
@@ -20,8 +20,8 @@ package org.apache.hudi.common.util;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -41,7 +41,7 @@ import java.util.Set;
  */
 public class DFSPropertiesConfiguration {
 
-  private static final Logger LOG = LogManager.getLogger(DFSPropertiesConfiguration.class);
+  private static final Logger LOG = LoggerFactory.getLogger(DFSPropertiesConfiguration.class);
 
   private final FileSystem fs;
 
@@ -87,7 +87,7 @@ public class DFSPropertiesConfiguration {
       BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(file)));
       addProperties(reader);
     } catch (IOException ioe) {
-      LOG.error("Error reading in properies from dfs", ioe);
+      LOG.error("Error reading in properties from dfs", ioe);
       throw new IllegalArgumentException("Cannot read properties from dfs", ioe);
     }
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java
index d9161e5..2860983 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FSUtils.java
@@ -39,8 +39,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -60,7 +60,7 @@ import java.util.stream.Stream;
  */
 public class FSUtils {
 
-  private static final Logger LOG = LogManager.getLogger(FSUtils.class);
+  private static final Logger LOG = LoggerFactory.getLogger(FSUtils.class);
   // Log files are of this pattern - .b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1
   private static final Pattern LOG_FILE_PATTERN =
       Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)(_(([0-9]*)-([0-9]*)-([0-9]*)))?");
@@ -84,7 +84,7 @@ public class FSUtils {
     // look for all properties, prefixed to be picked up
     for (Entry<String, String> prop : System.getenv().entrySet()) {
       if (prop.getKey().startsWith(HOODIE_ENV_PROPS_PREFIX)) {
-        LOG.info("Picking up value for hoodie env var :" + prop.getKey());
+        LOG.info("Picking up value for hoodie env var :{}", prop.getKey());
         conf.set(prop.getKey().replace(HOODIE_ENV_PROPS_PREFIX, "").replaceAll("_DOT_", "."), prop.getValue());
       }
     }
@@ -461,11 +461,11 @@ public class FSUtils {
    */
   public static boolean recoverDFSFileLease(final DistributedFileSystem dfs, final Path p)
       throws IOException, InterruptedException {
-    LOG.info("Recover lease on dfs file " + p);
+    LOG.info("Recover lease on dfs file {}", p);
     // initiate the recovery
     boolean recovered = false;
     for (int nbAttempt = 0; nbAttempt < MAX_ATTEMPTS_RECOVER_LEASE; nbAttempt++) {
-      LOG.info("Attempt " + nbAttempt + " to recover lease on dfs file " + p);
+      LOG.info("Attempt {} to recover lease on dfs file {}", nbAttempt, p);
       recovered = dfs.recoverLease(p);
       if (recovered) {
         break;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/FailSafeConsistencyGuard.java b/hudi-common/src/main/java/org/apache/hudi/common/util/FailSafeConsistencyGuard.java
index b4a0991..248e8e5 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/FailSafeConsistencyGuard.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/FailSafeConsistencyGuard.java
@@ -22,8 +22,8 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -39,7 +39,7 @@ import java.util.stream.Collectors;
  */
 public class FailSafeConsistencyGuard implements ConsistencyGuard {
 
-  private static final Logger LOG = LogManager.getLogger(FailSafeConsistencyGuard.class);
+  private static final Logger LOG = LoggerFactory.getLogger(FailSafeConsistencyGuard.class);
 
   private final FileSystem fs;
   private final ConsistencyGuardConfig consistencyGuardConfig;
@@ -86,7 +86,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
 
     retryTillSuccess((retryNum) -> {
       try {
-        LOG.info("Trying " + retryNum);
+        LOG.info("Trying {}", retryNum);
         FileStatus[] entries = fs.listStatus(dir);
         List<String> gotFiles = Arrays.stream(entries).map(e -> Path.getPathWithoutSchemeAndAuthority(e.getPath()))
             .map(p -> p.toString()).collect(Collectors.toList());
@@ -95,7 +95,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
 
         switch (event) {
           case DISAPPEAR:
-            LOG.info("Following files are visible" + candidateFiles);
+            LOG.info("Following files are visible {}", candidateFiles);
             // If no candidate files gets removed, it means all of them have disappeared
             return !altered;
           case APPEAR:
@@ -104,7 +104,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
             return candidateFiles.isEmpty();
         }
       } catch (IOException ioe) {
-        LOG.warn("Got IOException waiting for file event. Have tried " + retryNum + " time(s)", ioe);
+        LOG.warn("Got IOException waiting for file event. Have tried {} time(s)", retryNum, ioe);
       }
       return false;
     }, "Timed out waiting for files to become visible");
@@ -176,7 +176,7 @@ public class FailSafeConsistencyGuard implements ConsistencyGuard {
   private void retryTillSuccess(Function<Integer, Boolean> predicate, String timedOutMessage) throws TimeoutException {
     long waitMs = consistencyGuardConfig.getInitialConsistencyCheckIntervalMs();
     int attempt = 0;
-    LOG.info("Max Attempts=" + consistencyGuardConfig.getMaxConsistencyChecks());
+    LOG.info("Max Attempts={}", consistencyGuardConfig.getMaxConsistencyChecks());
     while (attempt < consistencyGuardConfig.getMaxConsistencyChecks()) {
       boolean success = predicate.apply(attempt);
       if (success) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordSizeEstimator.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordSizeEstimator.java
index 387b8d0..269ce49 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordSizeEstimator.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordSizeEstimator.java
@@ -23,8 +23,8 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
 
 import org.apache.avro.Schema;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Size Estimator for Hoodie record payload.
@@ -33,7 +33,7 @@ import org.apache.log4j.Logger;
  */
 public class HoodieRecordSizeEstimator<T extends HoodieRecordPayload> implements SizeEstimator<HoodieRecord<T>> {
 
-  private static final Logger LOG = LogManager.getLogger(HoodieRecordSizeEstimator.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieRecordSizeEstimator.class);
 
   // Schema used to get GenericRecord from HoodieRecordPayload then convert to bytes and vice-versa
   private final Schema schema;
@@ -50,7 +50,7 @@ public class HoodieRecordSizeEstimator<T extends HoodieRecordPayload> implements
     /** {@link ExternalSpillableMap} **/
     long sizeOfRecord = ObjectSizeCalculator.getObjectSize(hoodieRecord);
     long sizeOfSchema = ObjectSizeCalculator.getObjectSize(schema);
-    LOG.info("SizeOfRecord => " + sizeOfRecord + " SizeOfSchema => " + sizeOfSchema);
+    LOG.info("SizeOfRecord => {} SizeOfSchema => {}", sizeOfRecord, sizeOfSchema);
     return sizeOfRecord;
   }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBDAO.java b/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBDAO.java
index 59af74b..4fcb16d 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBDAO.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/RocksDBDAO.java
@@ -24,8 +24,8 @@ import org.apache.hudi.exception.HoodieIOException;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.rocksdb.ColumnFamilyDescriptor;
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.ColumnFamilyOptions;
@@ -55,7 +55,7 @@ import java.util.stream.Stream;
  */
 public class RocksDBDAO {
 
-  private static final Logger LOG = LogManager.getLogger(RocksDBDAO.class);
+  private static final Logger LOG = LoggerFactory.getLogger(RocksDBDAO.class);
 
   private transient ConcurrentHashMap<String, ColumnFamilyHandle> managedHandlesMap;
   private transient ConcurrentHashMap<String, ColumnFamilyDescriptor> managedDescriptorMap;
@@ -86,7 +86,7 @@ public class RocksDBDAO {
    */
   private void init() throws HoodieException {
     try {
-      LOG.info("DELETING RocksDB persisted at " + rocksDBBasePath);
+      LOG.info("DELETING RocksDB persisted at {}", rocksDBBasePath);
       FileIOUtils.deleteDirectory(new File(rocksDBBasePath));
 
       managedHandlesMap = new ConcurrentHashMap<>();
@@ -99,7 +99,7 @@ public class RocksDBDAO {
       dbOptions.setLogger(new org.rocksdb.Logger(dbOptions) {
         @Override
         protected void log(InfoLogLevel infoLogLevel, String logMsg) {
-          LOG.info("From Rocks DB : " + logMsg);
+          LOG.info("From Rocks DB : {}", logMsg);
         }
       });
       final List<ColumnFamilyDescriptor> managedColumnFamilies = loadManagedColumnFamilies(dbOptions);
@@ -138,7 +138,7 @@ public class RocksDBDAO {
       LOG.info("No column family found. Loading default");
       managedColumnFamilies.add(getColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY));
     } else {
-      LOG.info("Loading column families :" + existing.stream().map(String::new).collect(Collectors.toList()));
+      LOG.info("Loading column families :{}", existing.stream().map(String::new).collect(Collectors.toList()));
       managedColumnFamilies
           .addAll(existing.stream().map(RocksDBDAO::getColumnFamilyDescriptor).collect(Collectors.toList()));
     }
@@ -352,8 +352,8 @@ public class RocksDBDAO {
       }
     }
 
-    LOG.info("Prefix Search for (query=" + prefix + ") on " + columnFamilyName + ". Total Time Taken (msec)="
-        + timer.endTimer() + ". Serialization Time taken(micro)=" + timeTakenMicro + ", num entries=" + results.size());
+    LOG.info("Prefix Search for (query={}) on {}. Total Time Taken (msec)={}. Serialization Time taken(micro)={},"
+        + " num entries={}", prefix, columnFamilyName, timer.endTimer(), timeTakenMicro, results.size());
     return results.stream();
   }
 
@@ -366,7 +366,7 @@ public class RocksDBDAO {
    */
   public <T extends Serializable> void prefixDelete(String columnFamilyName, String prefix) {
     Preconditions.checkArgument(!closed);
-    LOG.info("Prefix DELETE (query=" + prefix + ") on " + columnFamilyName);
+    LOG.info("Prefix DELETE (query={}) on {}", prefix, columnFamilyName);
     final RocksIterator it = getRocksDB().newIterator(managedHandlesMap.get(columnFamilyName));
     it.seek(prefix.getBytes());
     // Find first and last keys to be deleted
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/TimelineDiffHelper.java b/hudi-common/src/main/java/org/apache/hudi/common/util/TimelineDiffHelper.java
index 7625bb5..fb49243 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/TimelineDiffHelper.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/TimelineDiffHelper.java
@@ -23,8 +23,8 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
 import org.apache.hudi.common.util.collection.Pair;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -36,7 +36,7 @@ import java.util.stream.Collectors;
  */
 public class TimelineDiffHelper {
 
-  private static final Logger LOG = LogManager.getLogger(TimelineDiffHelper.class);
+  private static final Logger LOG = LoggerFactory.getLogger(TimelineDiffHelper.class);
 
   public static TimelineDiffResult getNewInstantsForIncrementalSync(HoodieTimeline oldTimeline,
       HoodieTimeline newTimeline) {
@@ -64,8 +64,8 @@ public class TimelineDiffHelper {
       if (!lostPendingCompactions.isEmpty()) {
         // If a compaction is unscheduled, fall back to complete refresh of fs view since some log files could have been
         // moved. Its unsafe to incrementally sync in that case.
-        LOG.warn("Some pending compactions are no longer in new timeline (unscheduled ?). They are :"
-            + lostPendingCompactions);
+        LOG.warn("Some pending compactions are no longer in new timeline (unscheduled ?). They are :{}",
+            lostPendingCompactions);
         return TimelineDiffResult.UNSAFE_SYNC_RESULT;
       }
       List<HoodieInstant> finishedCompactionInstants = compactionInstants.stream()
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java
index e764a17..47c0f6a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/DiskBasedMap.java
@@ -25,8 +25,8 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -54,7 +54,7 @@ import java.util.stream.Stream;
  */
 public final class DiskBasedMap<T extends Serializable, R extends Serializable> implements Map<T, R>, Iterable<R> {
 
-  private static final Logger LOG = LogManager.getLogger(DiskBasedMap.class);
+  private static final Logger LOG = LoggerFactory.getLogger(DiskBasedMap.class);
   // Stores the key and corresponding value's latest metadata spilled to disk
   private final Map<T, ValueMetadata> valueMetadataMap;
   // Write only file
@@ -111,9 +111,8 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
       writeOnlyFile.getParentFile().mkdir();
     }
     writeOnlyFile.createNewFile();
-    LOG.info("Spilling to file location " + writeOnlyFile.getAbsolutePath() + " in host ("
-        + InetAddress.getLocalHost().getHostAddress() + ") with hostname (" + InetAddress.getLocalHost().getHostName()
-        + ")");
+    LOG.info("Spilling to file location {} in host ({}) with hostname ({})", writeOnlyFile.getAbsolutePath(),
+        InetAddress.getLocalHost().getHostAddress(), InetAddress.getLocalHost().getHostName());
     // Make sure file is deleted when JVM exits
     writeOnlyFile.deleteOnExit();
     addShutDownHook();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
index 32c41f7..b68cd9a 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/ExternalSpillableMap.java
@@ -22,8 +22,8 @@ import org.apache.hudi.common.util.ObjectSizeCalculator;
 import org.apache.hudi.common.util.SizeEstimator;
 import org.apache.hudi.exception.HoodieIOException;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -56,7 +56,7 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
 
   // Find the actual estimated payload size after inserting N records
   private static final int NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE = 100;
-  private static final Logger LOG = LogManager.getLogger(ExternalSpillableMap.class);
+  private static final Logger LOG = LoggerFactory.getLogger(ExternalSpillableMap.class);
   // maximum space allowed in-memory for this map
   private final long maxInMemorySizeInBytes;
   // Map to store key-values in memory until it hits maxInMemorySizeInBytes
@@ -177,7 +177,7 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
         // At first, use the sizeEstimate of a record being inserted into the spillable map.
         // Note, the converter may over estimate the size of a record in the JVM
         this.estimatedPayloadSize = keySizeEstimator.sizeEstimate(key) + valueSizeEstimator.sizeEstimate(value);
-        LOG.info("Estimated Payload size => " + estimatedPayloadSize);
+        LOG.info("Estimated Payload size => {}", estimatedPayloadSize);
       } else if (shouldEstimatePayloadSize && inMemoryMap.size() % NUMBER_OF_RECORDS_TO_ESTIMATE_PAYLOAD_SIZE == 0) {
         // Re-estimate the size of a record by calculating the size of the entire map containing
         // N entries and then dividing by the number of entries present (N). This helps to get a
@@ -186,7 +186,7 @@ public class ExternalSpillableMap<T extends Serializable, R extends Serializable
         this.currentInMemoryMapSize = totalMapSize;
         this.estimatedPayloadSize = totalMapSize / inMemoryMap.size();
         shouldEstimatePayloadSize = false;
-        LOG.info("New Estimated Payload size => " + this.estimatedPayloadSize);
+        LOG.info("New Estimated Payload size => {}", this.estimatedPayloadSize);
       }
       if (!inMemoryMap.containsKey(key)) {
         // TODO : Add support for adjusting payloadSize for updates to the same key
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
index cec9ab6..68fd3cf 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryExecutor.java
@@ -23,8 +23,8 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SizeEstimator;
 import org.apache.hudi.exception.HoodieException;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.List;
@@ -44,7 +44,7 @@ import java.util.stream.Collectors;
  */
 public class BoundedInMemoryExecutor<I, O, E> {
 
-  private static final Logger LOG = LogManager.getLogger(BoundedInMemoryExecutor.class);
+  private static final Logger LOG = LoggerFactory.getLogger(BoundedInMemoryExecutor.class);
 
   // Executor service used for launching writer thread.
   private final ExecutorService executorService;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java
index 2c5ce5d..319d552 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/BoundedInMemoryQueue.java
@@ -25,8 +25,8 @@ import org.apache.hudi.exception.HoodieException;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -58,7 +58,7 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
   public static final int RECORD_SAMPLING_RATE = 64;
   // maximum records that will be cached
   private static final int RECORD_CACHING_LIMIT = 128 * 1024;
-  private static final Logger LOG = LogManager.getLogger(BoundedInMemoryQueue.class);
+  private static final Logger LOG = LoggerFactory.getLogger(BoundedInMemoryQueue.class);
   // It indicates number of records to cache. We will be using sampled record's average size to
   // determine how many
   // records we should cache and will change (increase/decrease) permits accordingly.
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java
index 5496837..c70db12 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/FunctionBasedQueueProducer.java
@@ -18,8 +18,8 @@
 
 package org.apache.hudi.common.util.queue;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.function.Function;
 
@@ -30,7 +30,7 @@ import java.util.function.Function;
  */
 public class FunctionBasedQueueProducer<I> implements BoundedInMemoryQueueProducer<I> {
 
-  private static final Logger LOG = LogManager.getLogger(FunctionBasedQueueProducer.class);
+  private static final Logger LOG = LoggerFactory.getLogger(FunctionBasedQueueProducer.class);
 
   private final Function<BoundedInMemoryQueue<I, ?>, Boolean> producerFunction;
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java
index 3d11f38..c4719a8 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/queue/IteratorBasedQueueProducer.java
@@ -18,8 +18,8 @@
 
 package org.apache.hudi.common.util.queue;
 
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
 
@@ -30,7 +30,7 @@ import java.util.Iterator;
  */
 public class IteratorBasedQueueProducer<I> implements BoundedInMemoryQueueProducer<I> {
 
-  private static final Logger LOG = LogManager.getLogger(IteratorBasedQueueProducer.class);
+  private static final Logger LOG = LoggerFactory.getLogger(IteratorBasedQueueProducer.class);
 
   // input iterator for producing items in the buffer.
   private final Iterator<I> inputIterator;
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java b/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java
index 9bc9a8d..02ee92a 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/minicluster/HdfsTestService.java
@@ -27,8 +27,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -38,7 +38,7 @@ import java.io.IOException;
  */
 public class HdfsTestService {
 
-  private static final Logger LOG = LogManager.getLogger(HdfsTestService.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HdfsTestService.class);
 
   /**
    * Configuration settings.
@@ -72,7 +72,7 @@ public class HdfsTestService {
     // If clean, then remove the work dir so we can start fresh.
     String localDFSLocation = getDFSLocation(workDir);
     if (format) {
-      LOG.info("Cleaning HDFS cluster data at: " + localDFSLocation + " and starting fresh.");
+      LOG.info("Cleaning HDFS cluster data at: {} and starting fresh.", localDFSLocation);
       File file = new File(localDFSLocation);
       FileIOUtils.deleteDirectory(file);
     }
@@ -115,7 +115,7 @@ public class HdfsTestService {
   private static Configuration configureDFSCluster(Configuration config, String localDFSLocation, String bindIP,
       int namenodeRpcPort, int datanodePort, int datanodeIpcPort, int datanodeHttpPort) {
 
-    LOG.info("HDFS force binding to ip: " + bindIP);
+    LOG.info("HDFS force binding to ip: {}", bindIP);
     config.set(DFSConfigKeys.FS_DEFAULT_NAME_KEY, "hdfs://" + bindIP + ":" + namenodeRpcPort);
     config.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, bindIP + ":" + datanodePort);
     config.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, bindIP + ":" + datanodeIpcPort);
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/minicluster/ZookeeperTestService.java b/hudi-common/src/test/java/org/apache/hudi/common/minicluster/ZookeeperTestService.java
index 670be44..c2dadd3 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/minicluster/ZookeeperTestService.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/minicluster/ZookeeperTestService.java
@@ -22,8 +22,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.io.Files;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnLog;
@@ -53,7 +53,7 @@ import java.net.Socket;
  */
 public class ZookeeperTestService {
 
-  private static final Logger LOG = LogManager.getLogger(ZookeeperTestService.class);
+  private static final Logger LOG = LoggerFactory.getLogger(ZookeeperTestService.class);
 
   private static final int TICK_TIME = 2000;
   private static final int CONNECTION_TIMEOUT = 30000;
@@ -103,7 +103,7 @@ public class ZookeeperTestService {
 
     // NOTE: Changed from the original, where InetSocketAddress was
     // originally created to bind to the wildcard IP, we now configure it.
-    LOG.info("Zookeeper force binding to: " + this.bindIP);
+    LOG.info("Zookeeper force binding to: {}", this.bindIP);
     standaloneServerFactory.configure(new InetSocketAddress(bindIP, clientPort), 1000);
 
     // Start up this ZK server
@@ -120,7 +120,7 @@ public class ZookeeperTestService {
     }
 
     started = true;
-    LOG.info("Zookeeper Minicluster service started on client port: " + clientPort);
+    LOG.info("Zookeeper Minicluster service started on client port: {}", clientPort);
     return zooKeeperServer;
   }
 
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
index 8828402..6dc9d8a 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestHoodieTableFileSystemView.java
@@ -43,8 +43,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -71,7 +71,7 @@ import static org.junit.Assert.assertTrue;
 @SuppressWarnings("ResultOfMethodCallIgnored")
 public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
 
-  private static final Logger LOG = LogManager.getLogger(TestHoodieTableFileSystemView.class);
+  private static final Logger LOG = LoggerFactory.getLogger(TestHoodieTableFileSystemView.class);
 
   private static String TEST_WRITE_TOKEN = "1-0-1";
 
@@ -489,7 +489,7 @@ public class TestHoodieTableFileSystemView extends HoodieCommonTestHarness {
     roView.getAllDataFiles(partitionPath);
 
     fileSliceList = rtView.getLatestFileSlices(partitionPath).collect(Collectors.toList());
-    LOG.info("FILESLICE LIST=" + fileSliceList);
+    LOG.info("FILESLICE LIST={}", fileSliceList);
     dataFiles = fileSliceList.stream().map(FileSlice::getDataFile).filter(Option::isPresent).map(Option::get)
         .collect(Collectors.toList());
     assertEquals("Expect only one data-files in latest view as there is only one file-group", 1, dataFiles.size());

[hudi] 01/03: [HUDI-459] Redo hudi-hive log statements using SLF4J (#1203)

Posted by pw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch redo-log
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ac105e6d9b6dcd75f8145af6ce03600af40180e0
Author: lamber-ken <la...@163.com>
AuthorDate: Fri Jan 10 09:38:34 2020 +0800

    [HUDI-459] Redo hudi-hive log statements using SLF4J (#1203)
---
 hudi-hive/pom.xml                                  |  5 +++
 .../java/org/apache/hudi/hive/HiveSyncTool.java    | 30 +++++++--------
 .../org/apache/hudi/hive/HoodieHiveClient.java     | 44 +++++++++++-----------
 .../java/org/apache/hudi/hive/util/SchemaUtil.java | 12 +++---
 .../org/apache/hudi/hive/util/HiveTestService.java | 10 ++---
 5 files changed, 53 insertions(+), 48 deletions(-)

diff --git a/hudi-hive/pom.xml b/hudi-hive/pom.xml
index c552b70..1ab2533 100644
--- a/hudi-hive/pom.xml
+++ b/hudi-hive/pom.xml
@@ -49,6 +49,11 @@
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j.version}</version>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.parquet</groupId>
diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 6bcb697..4029096 100644
--- a/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ b/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -34,8 +34,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.parquet.schema.MessageType;
 
 import java.util.List;
@@ -52,7 +52,7 @@ import java.util.stream.Collectors;
 @SuppressWarnings("WeakerAccess")
 public class HiveSyncTool {
 
-  private static final Logger LOG = LogManager.getLogger(HiveSyncTool.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HiveSyncTool.class);
   private final HoodieHiveClient hoodieHiveClient;
   public static final String SUFFIX_REALTIME_TABLE = "_rt";
   private final HiveSyncConfig cfg;
@@ -79,7 +79,7 @@ public class HiveSyncTool {
           cfg.tableName = originalTableName;
           break;
         default:
-          LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
+          LOG.error("Unknown table type {}", hoodieHiveClient.getTableType());
           throw new InvalidDatasetException(hoodieHiveClient.getBasePath());
       }
     } catch (RuntimeException re) {
@@ -90,8 +90,8 @@ public class HiveSyncTool {
   }
 
   private void syncHoodieTable(boolean isRealTime) throws ClassNotFoundException {
-    LOG.info("Trying to sync hoodie table " + cfg.tableName + " with base path " + hoodieHiveClient.getBasePath()
-        + " of type " + hoodieHiveClient.getTableType());
+    LOG.info("Trying to sync hoodie table {} with base path {} of type {}",
+        cfg.tableName, hoodieHiveClient.getBasePath(), hoodieHiveClient.getTableType());
 
     // Check if the necessary table exists
     boolean tableExists = hoodieHiveClient.doesTableExist();
@@ -100,20 +100,20 @@ public class HiveSyncTool {
     // Sync schema if needed
     syncSchema(tableExists, isRealTime, schema);
 
-    LOG.info("Schema sync complete. Syncing partitions for " + cfg.tableName);
+    LOG.info("Schema sync complete. Syncing partitions for {}", cfg.tableName);
     // Get the last time we successfully synced partitions
     Option<String> lastCommitTimeSynced = Option.empty();
     if (tableExists) {
       lastCommitTimeSynced = hoodieHiveClient.getLastCommitTimeSynced();
     }
-    LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null"));
+    LOG.info("Last commit time synced was found to be {}", lastCommitTimeSynced.orElse("null"));
     List<String> writtenPartitionsSince = hoodieHiveClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
-    LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size());
+    LOG.info("Storage partitions scan complete. Found {}", writtenPartitionsSince.size());
     // Sync the partitions if needed
     syncPartitions(writtenPartitionsSince);
 
     hoodieHiveClient.updateLastCommitTimeSynced();
-    LOG.info("Sync complete for " + cfg.tableName);
+    LOG.info("Sync complete for {}", cfg.tableName);
   }
 
   /**
@@ -126,7 +126,7 @@ public class HiveSyncTool {
   private void syncSchema(boolean tableExists, boolean isRealTime, MessageType schema) throws ClassNotFoundException {
     // Check and sync schema
     if (!tableExists) {
-      LOG.info("Table " + cfg.tableName + " is not found. Creating it");
+      LOG.info("Table {} is not found. Creating it", cfg.tableName);
       if (!isRealTime) {
         // TODO - RO Table for MOR only after major compaction (UnboundedCompaction is default
         // for now)
@@ -150,10 +150,10 @@ public class HiveSyncTool {
       Map<String, String> tableSchema = hoodieHiveClient.getTableSchema();
       SchemaDifference schemaDiff = SchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields);
       if (!schemaDiff.isEmpty()) {
-        LOG.info("Schema difference found for " + cfg.tableName);
+        LOG.info("Schema difference found for {}", cfg.tableName);
         hoodieHiveClient.updateTableDefinition(schema);
       } else {
-        LOG.info("No Schema difference for " + cfg.tableName);
+        LOG.info("No Schema difference for {}", cfg.tableName);
       }
     }
   }
@@ -168,10 +168,10 @@ public class HiveSyncTool {
       List<PartitionEvent> partitionEvents =
           hoodieHiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
       List<String> newPartitions = filterPartitions(partitionEvents, PartitionEventType.ADD);
-      LOG.info("New Partitions " + newPartitions);
+      LOG.info("New Partitions {}", newPartitions);
       hoodieHiveClient.addPartitionsToTable(newPartitions);
       List<String> updatePartitions = filterPartitions(partitionEvents, PartitionEventType.UPDATE);
-      LOG.info("Changed Partitions " + updatePartitions);
+      LOG.info("Changed Partitions {}", updatePartitions);
       hoodieHiveClient.updatePartitionsToTable(updatePartitions);
     } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed to sync partitions for table " + cfg.tableName, e);
diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index d176500..820e59b 100644
--- a/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -48,8 +48,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.jdbc.HiveDriver;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -87,7 +87,7 @@ public class HoodieHiveClient {
     }
   }
 
-  private static final Logger LOG = LogManager.getLogger(HoodieHiveClient.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieHiveClient.class);
   private final HoodieTableMetaClient metaClient;
   private final HoodieTableType tableType;
   private final PartitionValueExtractor partitionValueExtractor;
@@ -108,7 +108,7 @@ public class HoodieHiveClient {
     // Support both JDBC and metastore based implementations for backwards compatiblity. Future users should
     // disable jdbc and depend on metastore client for all hive registrations
     if (cfg.useJdbc) {
-      LOG.info("Creating hive connection " + cfg.jdbcUrl);
+      LOG.info("Creating hive connection {}", cfg.jdbcUrl);
       createHiveConnection();
     }
     try {
@@ -137,10 +137,10 @@ public class HoodieHiveClient {
    */
   void addPartitionsToTable(List<String> partitionsToAdd) {
     if (partitionsToAdd.isEmpty()) {
-      LOG.info("No partitions to add for " + syncConfig.tableName);
+      LOG.info("No partitions to add for {}", syncConfig.tableName);
       return;
     }
-    LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + syncConfig.tableName);
+    LOG.info("Adding partitions {} to table {}", partitionsToAdd.size(), syncConfig.tableName);
     String sql = constructAddPartitions(partitionsToAdd);
     updateHiveSQL(sql);
   }
@@ -150,10 +150,10 @@ public class HoodieHiveClient {
    */
   void updatePartitionsToTable(List<String> changedPartitions) {
     if (changedPartitions.isEmpty()) {
-      LOG.info("No partitions to change for " + syncConfig.tableName);
+      LOG.info("No partitions to change for {}", syncConfig.tableName);
       return;
     }
-    LOG.info("Changing partitions " + changedPartitions.size() + " on " + syncConfig.tableName);
+    LOG.info("Changing partitions {} on {}", changedPartitions.size(), syncConfig.tableName);
     List<String> sqls = constructChangePartitions(changedPartitions);
     for (String sql : sqls) {
       updateHiveSQL(sql);
@@ -260,7 +260,7 @@ public class HoodieHiveClient {
               .append(HIVE_ESCAPE_CHARACTER).append(syncConfig.tableName)
               .append(HIVE_ESCAPE_CHARACTER).append(" REPLACE COLUMNS(")
               .append(newSchemaStr).append(" )").append(cascadeClause);
-      LOG.info("Updating table definition with " + sqlBuilder);
+      LOG.info("Updating table definition with {}", sqlBuilder);
       updateHiveSQL(sqlBuilder.toString());
     } catch (IOException e) {
       throw new HoodieHiveSyncException("Failed to update table for " + syncConfig.tableName, e);
@@ -271,7 +271,7 @@ public class HoodieHiveClient {
     try {
       String createSQLQuery =
           SchemaUtil.generateCreateDDL(storageSchema, syncConfig, inputFormatClass, outputFormatClass, serdeClass);
-      LOG.info("Creating table with " + createSQLQuery);
+      LOG.info("Creating table with {}", createSQLQuery);
       updateHiveSQL(createSQLQuery);
     } catch (IOException e) {
       throw new HoodieHiveSyncException("Failed to create table " + syncConfig.tableName, e);
@@ -329,7 +329,7 @@ public class HoodieHiveClient {
       schema.putAll(columnsMap);
       schema.putAll(partitionKeysMap);
       final long end = System.currentTimeMillis();
-      LOG.info(String.format("Time taken to getTableSchema: %s ms", (end - start)));
+      LOG.info("Time taken to getTableSchema: {} ms", (end - start));
       return schema;
     } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed to get table schema for : " + syncConfig.tableName, e);
@@ -364,7 +364,7 @@ public class HoodieHiveClient {
           // Get a datafile written and get the schema from that file
           Option<HoodieInstant> lastCompactionCommit =
               metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
-          LOG.info("Found the last compaction commit as " + lastCompactionCommit);
+          LOG.info("Found the last compaction commit as {}", lastCompactionCommit);
 
           Option<HoodieInstant> lastDeltaCommit;
           if (lastCompactionCommit.isPresent()) {
@@ -374,7 +374,7 @@ public class HoodieHiveClient {
             lastDeltaCommit =
                 metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
           }
-          LOG.info("Found the last delta commit " + lastDeltaCommit);
+          LOG.info("Found the last delta commit {}", lastDeltaCommit);
 
           if (lastDeltaCommit.isPresent()) {
             HoodieInstant lastDeltaInstant = lastDeltaCommit.get();
@@ -407,7 +407,7 @@ public class HoodieHiveClient {
             return readSchemaFromLastCompaction(lastCompactionCommit);
           }
         default:
-          LOG.error("Unknown table type " + tableType);
+          LOG.error("Unknown table type {}", tableType);
           throw new InvalidDatasetException(syncConfig.basePath);
       }
     } catch (IOException e) {
@@ -441,7 +441,7 @@ public class HoodieHiveClient {
     MessageType messageType = SchemaUtil.readSchemaFromLogFile(fs, path);
     // Fall back to read the schema from last compaction
     if (messageType == null) {
-      LOG.info("Falling back to read the schema from last compaction " + lastCompactionCommitOpt);
+      LOG.info("Falling back to read the schema from last compaction {}", lastCompactionCommitOpt);
       return readSchemaFromLastCompaction(lastCompactionCommitOpt);
     }
     return messageType;
@@ -451,7 +451,7 @@ public class HoodieHiveClient {
    * Read the parquet schema from a parquet File.
    */
   private MessageType readSchemaFromDataFile(Path parquetFilePath) throws IOException {
-    LOG.info("Reading schema from " + parquetFilePath);
+    LOG.info("Reading schema from {}", parquetFilePath);
     if (!fs.exists(parquetFilePath)) {
       throw new IllegalArgumentException(
           "Failed to read schema from data file " + parquetFilePath + ". File does not exist.");
@@ -482,7 +482,7 @@ public class HoodieHiveClient {
       Statement stmt = null;
       try {
         stmt = connection.createStatement();
-        LOG.info("Executing SQL " + s);
+        LOG.info("Executing SQL {}", s);
         stmt.execute(s);
       } catch (SQLException e) {
         throw new HoodieHiveSyncException("Failed in executing SQL " + s, e);
@@ -513,12 +513,12 @@ public class HoodieHiveClient {
       ss = SessionState.start(configuration);
       hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration);
       final long endTime = System.currentTimeMillis();
-      LOG.info(String.format("Time taken to start SessionState and create Driver: %s ms", (endTime - startTime)));
+      LOG.info("Time taken to start SessionState and create Driver: {} ms", (endTime - startTime));
       for (String sql : sqls) {
         final long start = System.currentTimeMillis();
         responses.add(hiveDriver.run(sql));
         final long end = System.currentTimeMillis();
-        LOG.info(String.format("Time taken to execute [%s]: %s ms", sql, (end - start)));
+        LOG.info("Time taken to execute [{}]: {} ms", sql, (end - start));
       }
     } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed in executing SQL", e);
@@ -552,7 +552,7 @@ public class HoodieHiveClient {
 
       try {
         this.connection = DriverManager.getConnection(syncConfig.jdbcUrl, syncConfig.hiveUser, syncConfig.hivePass);
-        LOG.info("Successfully established Hive connection to  " + syncConfig.jdbcUrl);
+        LOG.info("Successfully established Hive connection to {}", syncConfig.jdbcUrl);
       } catch (SQLException e) {
         throw new HoodieHiveSyncException("Cannot create hive connection " + getHiveJdbcUrlWithDefaultDBName(), e);
       }
@@ -630,14 +630,14 @@ public class HoodieHiveClient {
   @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
   List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSynced) {
     if (!lastCommitTimeSynced.isPresent()) {
-      LOG.info("Last commit time synced is not known, listing all partitions in " + syncConfig.basePath + ",FS :" + fs);
+      LOG.info("Last commit time synced is not known, listing all partitions in {}, FS :{}", syncConfig.basePath, fs);
       try {
         return FSUtils.getAllPartitionPaths(fs, syncConfig.basePath, syncConfig.assumeDatePartitioning);
       } catch (IOException e) {
         throw new HoodieIOException("Failed to list all partitions in " + syncConfig.basePath, e);
       }
     } else {
-      LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then");
+      LOG.info("Last commit time synced is {}, Getting commits since then", lastCommitTimeSynced.get());
 
       HoodieTimeline timelineToSync = activeTimeline.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE);
       return timelineToSync.getInstants().map(s -> {
diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java b/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
index d945b58..77c04b5 100644
--- a/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
+++ b/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
@@ -31,8 +31,8 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.schema.DecimalMetadata;
 import org.apache.parquet.schema.GroupType;
@@ -53,7 +53,7 @@ import java.util.stream.Collectors;
  */
 public class SchemaUtil {
 
-  private static final Logger LOG = LogManager.getLogger(SchemaUtil.class);
+  private static final Logger LOG = LoggerFactory.getLogger(SchemaUtil.class);
   public static final String HIVE_ESCAPE_CHARACTER = "`";
 
   /**
@@ -67,7 +67,7 @@ public class SchemaUtil {
     } catch (IOException e) {
       throw new HoodieHiveSyncException("Failed to convert parquet schema to hive schema", e);
     }
-    LOG.info("Getting schema difference for " + tableSchema + "\r\n\r\n" + newTableSchema);
+    LOG.info("Getting schema difference for tableSchema :{}, newTableSchema :{}", tableSchema, newTableSchema);
     SchemaDifference.Builder schemaDiffBuilder = SchemaDifference.newBuilder(storageSchema, tableSchema);
     Set<String> tableColumns = Sets.newHashSet();
 
@@ -85,7 +85,7 @@ public class SchemaUtil {
             continue;
           }
           // We will log this and continue. Hive schema is a superset of all parquet schemas
-          LOG.warn("Ignoring table column " + fieldName + " as its not present in the parquet schema");
+          LOG.warn("Ignoring table column {} as its not present in the parquet schema", fieldName);
           continue;
         }
         tableColumnType = tableColumnType.replaceAll("\\s+", "");
@@ -112,7 +112,7 @@ public class SchemaUtil {
         schemaDiffBuilder.addTableColumn(entry.getKey(), entry.getValue());
       }
     }
-    LOG.info("Difference between schemas: " + schemaDiffBuilder.build().toString());
+    LOG.info("Difference between schemas: {}", schemaDiffBuilder.build().toString());
 
     return schemaDiffBuilder.build();
   }
diff --git a/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java b/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java
index d82c33b..5118d19 100644
--- a/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java
+++ b/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java
@@ -35,8 +35,8 @@ import org.apache.hadoop.hive.metastore.TUGIBasedProcessor;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.thrift.TUGIContainingTransport;
 import org.apache.hive.service.server.HiveServer2;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.server.TServer;
@@ -59,7 +59,7 @@ import java.util.concurrent.Executors;
 
 public class HiveTestService {
 
-  private static final Logger LOG = LogManager.getLogger(HiveTestService.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HiveTestService.class);
 
   private static final int CONNECTION_TIMEOUT = 30000;
 
@@ -95,7 +95,7 @@ public class HiveTestService {
 
     String localHiveLocation = getHiveLocation(workDir);
     if (clean) {
-      LOG.info("Cleaning Hive cluster data at: " + localHiveLocation + " and starting fresh.");
+      LOG.info("Cleaning Hive cluster data at: {} and starting fresh.", localHiveLocation);
       File file = new File(localHiveLocation);
       FileIOUtils.deleteDirectory(file);
     }
@@ -155,7 +155,7 @@ public class HiveTestService {
         return true;
       } catch (MetaException e) {
         // ignore as this is expected
-        LOG.info("server " + hostname + ":" + port + " not up " + e);
+        LOG.error("server {}:{} not up ", hostname, port, e);
       }
 
       if (System.currentTimeMillis() > start + timeout) {