You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2020/01/13 03:18:20 UTC
[incubator-hudi] branch redo-log updated: [HUDI-458] Redo
hudi-hadoop-mr log statements using SLF4J (#1210)
This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch redo-log
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/redo-log by this push:
new 8f8cd42 [HUDI-458] Redo hudi-hadoop-mr log statements using SLF4J (#1210)
8f8cd42 is described below
commit 8f8cd42e9792e0a9648b68f933929ad87cd503c5
Author: Mathieu <49...@users.noreply.github.com>
AuthorDate: Mon Jan 13 11:18:09 2020 +0800
[HUDI-458] Redo hudi-hadoop-mr log statements using SLF4J (#1210)
---
hudi-hadoop-mr/pom.xml | 7 ++++++
.../org/apache/hudi/hadoop/HoodieHiveUtil.java | 12 +++++-----
.../hudi/hadoop/HoodieParquetInputFormat.java | 26 ++++++++++----------
.../hudi/hadoop/HoodieROTablePathFilter.java | 28 +++++++++++-----------
.../hudi/hadoop/RecordReaderValueIterator.java | 6 ++---
.../hadoop/hive/HoodieCombineHiveInputFormat.java | 27 ++++++++++-----------
.../realtime/AbstractRealtimeRecordReader.java | 24 +++++++++----------
.../realtime/HoodieParquetRealtimeInputFormat.java | 22 ++++++++---------
.../realtime/HoodieRealtimeRecordReader.java | 6 ++---
.../realtime/RealtimeCompactedRecordReader.java | 14 +++++------
10 files changed, 89 insertions(+), 83 deletions(-)
diff --git a/hudi-hadoop-mr/pom.xml b/hudi-hadoop-mr/pom.xml
index 6bc3c8e..2a222f3 100644
--- a/hudi-hadoop-mr/pom.xml
+++ b/hudi-hadoop-mr/pom.xml
@@ -81,6 +81,13 @@
<artifactId>hive-exec</artifactId>
</dependency>
+ <!-- Logging -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+
<!-- Hoodie - Test -->
<dependency>
<groupId>org.apache.hudi</groupId>
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java
index 1db8c54..f371719 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveUtil.java
@@ -20,12 +20,12 @@ package org.apache.hudi.hadoop;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class HoodieHiveUtil {
- public static final Logger LOG = LogManager.getLogger(HoodieHiveUtil.class);
+ public static final Logger LOG = LoggerFactory.getLogger(HoodieHiveUtil.class);
public static final String HOODIE_CONSUME_MODE_PATTERN = "hoodie.%s.consume.mode";
public static final String HOODIE_START_COMMIT_PATTERN = "hoodie.%s.consume.start.timestamp";
@@ -43,20 +43,20 @@ public class HoodieHiveUtil {
if (maxCommits == MAX_COMMIT_ALL) {
maxCommits = Integer.MAX_VALUE;
}
- LOG.info("Read max commits - " + maxCommits);
+ LOG.info("Read max commits - {}", maxCommits);
return maxCommits;
}
public static String readStartCommitTime(JobContext job, String tableName) {
String startCommitTimestampName = String.format(HOODIE_START_COMMIT_PATTERN, tableName);
- LOG.info("Read start commit time - " + job.getConfiguration().get(startCommitTimestampName));
+ LOG.info("Read start commit time - {}", job.getConfiguration().get(startCommitTimestampName));
return job.getConfiguration().get(startCommitTimestampName);
}
public static String readMode(JobContext job, String tableName) {
String modePropertyName = String.format(HOODIE_CONSUME_MODE_PATTERN, tableName);
String mode = job.getConfiguration().get(modePropertyName, DEFAULT_SCAN_MODE);
- LOG.info(modePropertyName + ": " + mode);
+ LOG.info("Hoodie consume mode pattern is : {}, mode is : {}", modePropertyName, mode);
return mode;
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
index e8f7de0..ea92f11 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
@@ -42,8 +42,8 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -60,7 +60,7 @@ import java.util.stream.Collectors;
@UseFileSplitsFromInputFormat
public class HoodieParquetInputFormat extends MapredParquetInputFormat implements Configurable {
- private static final Logger LOG = LogManager.getLogger(HoodieParquetInputFormat.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HoodieParquetInputFormat.class);
protected Configuration conf;
@@ -69,7 +69,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
// Get all the file status from FileInputFormat and then do the filter
FileStatus[] fileStatuses = super.listStatus(job);
Map<HoodieTableMetaClient, List<FileStatus>> groupedFileStatus = groupFileStatus(fileStatuses);
- LOG.info("Found a total of " + groupedFileStatus.size() + " groups");
+ LOG.info("Found a total of {} groups", groupedFileStatus.size());
List<FileStatus> returns = new ArrayList<>();
for (Map.Entry<HoodieTableMetaClient, List<FileStatus>> entry : groupedFileStatus.entrySet()) {
HoodieTableMetaClient metadata = entry.getKey();
@@ -81,7 +81,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
FileStatus[] statuses = entry.getValue().toArray(new FileStatus[entry.getValue().size()]);
if (LOG.isDebugEnabled()) {
- LOG.debug("Hoodie Metadata initialized with completed commit Ts as :" + metadata);
+ LOG.debug("Hoodie Metadata initialized with completed commit Ts as : {}", metadata);
}
String tableName = metadata.getTableConfig().getTableName();
String mode = HoodieHiveUtil.readMode(Job.getInstance(job), tableName);
@@ -95,24 +95,24 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
String lastIncrementalTs = HoodieHiveUtil.readStartCommitTime(Job.getInstance(job), tableName);
// Total number of commits to return in this batch. Set this to -1 to get all the commits.
Integer maxCommits = HoodieHiveUtil.readMaxCommits(Job.getInstance(job), tableName);
- LOG.info("Last Incremental timestamp was set as " + lastIncrementalTs);
+ LOG.info("Last Incremental timestamp was set as {}", lastIncrementalTs);
List<String> commitsToReturn = timeline.findInstantsAfter(lastIncrementalTs, maxCommits).getInstants()
.map(HoodieInstant::getTimestamp).collect(Collectors.toList());
List<HoodieDataFile> filteredFiles =
roView.getLatestDataFilesInRange(commitsToReturn).collect(Collectors.toList());
for (HoodieDataFile filteredFile : filteredFiles) {
- LOG.info("Processing incremental hoodie file - " + filteredFile.getPath());
+ LOG.info("Processing incremental hoodie file - {}", filteredFile.getPath());
filteredFile = checkFileStatus(filteredFile);
returns.add(filteredFile.getFileStatus());
}
- LOG.info("Total paths to process after hoodie incremental filter " + filteredFiles.size());
+ LOG.info("Total paths to process after hoodie incremental filter {}", filteredFiles.size());
} else {
// filter files on the latest commit found
List<HoodieDataFile> filteredFiles = roView.getLatestDataFiles().collect(Collectors.toList());
- LOG.info("Total paths to process after hoodie filter " + filteredFiles.size());
+ LOG.info("Total paths to process after hoodie filter {}", filteredFiles.size());
for (HoodieDataFile filteredFile : filteredFiles) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Processing latest hoodie file - " + filteredFile.getPath());
+ LOG.debug("Processing latest hoodie file - {}", filteredFile.getPath());
}
filteredFile = checkFileStatus(filteredFile);
returns.add(filteredFile.getFileStatus());
@@ -133,7 +133,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
try {
if (dataFile.getFileSize() == 0) {
FileSystem fs = dataPath.getFileSystem(conf);
- LOG.info("Refreshing file status " + dataFile.getPath());
+ LOG.info("Refreshing file status {}", dataFile.getPath());
return new HoodieDataFile(fs.getFileStatus(dataPath));
}
return dataFile;
@@ -160,7 +160,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
metadata = getTableMetaClient(status.getPath().getFileSystem(conf), status.getPath().getParent());
nonHoodieBasePath = null;
} catch (DatasetNotFoundException | InvalidDatasetException e) {
- LOG.info("Handling a non-hoodie path " + status.getPath());
+ LOG.info("Handling a non-hoodie path {}", status.getPath());
metadata = null;
nonHoodieBasePath = status.getPath().getParent().toString();
}
@@ -213,7 +213,7 @@ public class HoodieParquetInputFormat extends MapredParquetInputFormat implement
levels = metadata.getPartitionDepth();
}
Path baseDir = HoodieHiveUtil.getNthParent(dataPath, levels);
- LOG.info("Reading hoodie metadata from path " + baseDir.toString());
+ LOG.info("Reading hoodie metadata from path {}", baseDir);
return new HoodieTableMetaClient(fs.getConf(), baseDir.toString());
}
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
index fae8111..4670e4f 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.HashMap;
@@ -50,7 +50,7 @@ import java.util.stream.Collectors;
*/
public class HoodieROTablePathFilter implements PathFilter, Serializable {
- private static final Logger LOG = LogManager.getLogger(HoodieROTablePathFilter.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HoodieROTablePathFilter.class);
/**
* Its quite common, to have all files from a given partition path be passed into accept(), cache the check for hoodie
@@ -88,7 +88,7 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
public boolean accept(Path path) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Checking acceptance for path " + path);
+ LOG.debug("Checking acceptance for path {}", path);
}
Path folder = null;
try {
@@ -101,15 +101,15 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
// Try to use the caches.
if (nonHoodiePathCache.contains(folder.toString())) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Accepting non-hoodie path from cache: " + path);
+ LOG.debug("Accepting non-hoodie path from cache: {}", path);
}
return true;
}
if (hoodiePathCache.containsKey(folder.toString())) {
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("%s Hoodie path checked against cache, accept => %s \n", path,
- hoodiePathCache.get(folder.toString()).contains(path)));
+ LOG.debug("{} Hoodie path checked against cache, accept => {}", path,
+ hoodiePathCache.get(folder.toString()).contains(path));
}
return hoodiePathCache.get(folder.toString()).contains(path);
}
@@ -119,7 +119,7 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
if (filePath.contains("/" + HoodieTableMetaClient.METAFOLDER_NAME + "/")
|| filePath.endsWith("/" + HoodieTableMetaClient.METAFOLDER_NAME)) {
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Skipping Hoodie Metadata file %s \n", filePath));
+ LOG.debug("Skipping Hoodie Metadata file {}", filePath);
}
return false;
}
@@ -144,22 +144,22 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
if (!hoodiePathCache.containsKey(folder.toString())) {
hoodiePathCache.put(folder.toString(), new HashSet<>());
}
- LOG.info("Based on hoodie metadata from base path: " + baseDir.toString() + ", caching " + latestFiles.size()
- + " files under " + folder);
+ LOG.info("Based on hoodie metadata from base path: {}, caching {} files under {}", baseDir,
+ latestFiles.size(), folder);
for (HoodieDataFile lfile : latestFiles) {
hoodiePathCache.get(folder.toString()).add(new Path(lfile.getPath()));
}
// accept the path, if its among the latest files.
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("%s checked after cache population, accept => %s \n", path,
- hoodiePathCache.get(folder.toString()).contains(path)));
+ LOG.debug("{} checked after cache population, accept => {}", path,
+ hoodiePathCache.get(folder.toString()).contains(path));
}
return hoodiePathCache.get(folder.toString()).contains(path);
} catch (DatasetNotFoundException e) {
// Non-hoodie path, accept it.
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("(1) Caching non-hoodie path under %s \n", folder.toString()));
+ LOG.debug("(1) Caching non-hoodie path under {}", folder);
}
nonHoodiePathCache.add(folder.toString());
return true;
@@ -167,7 +167,7 @@ public class HoodieROTablePathFilter implements PathFilter, Serializable {
} else {
// files is at < 3 level depth in FS tree, can't be hoodie dataset
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("(2) Caching non-hoodie path under %s \n", folder.toString()));
+ LOG.debug("(2) Caching non-hoodie path under {}", folder);
}
nonHoodiePathCache.add(folder.toString());
return true;
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java
index 0386186..7ffa3bf 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java
@@ -21,8 +21,8 @@ package org.apache.hudi.hadoop;
import org.apache.hudi.exception.HoodieException;
import org.apache.hadoop.mapred.RecordReader;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
@@ -36,7 +36,7 @@ import java.util.NoSuchElementException;
*/
public class RecordReaderValueIterator<K, V> implements Iterator<V> {
- private static final Logger LOG = LogManager.getLogger(RecordReaderValueIterator.class);
+ private static final Logger LOG = LoggerFactory.getLogger(RecordReaderValueIterator.class);
private final RecordReader<K, V> reader;
private V nextVal = null;
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
index cd1cea3..4ed7ba9 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
@@ -57,8 +57,8 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.lib.CombineFileInputFormat;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.DataInput;
import java.io.DataOutput;
@@ -93,7 +93,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
extends HiveInputFormat<K, V> {
private static final String CLASS_NAME = HoodieCombineHiveInputFormat.class.getName();
- public static final Logger LOG = LogManager.getLogger(CLASS_NAME);
+ public static final Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
// max number of threads we can use to check non-combinable paths
private static final int MAX_CHECK_NONCOMBINABLE_THREAD_NUM = 50;
@@ -125,7 +125,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
if (inputFormat instanceof AvoidSplitCombination
&& ((AvoidSplitCombination) inputFormat).shouldSkipCombine(paths[i + start], conf)) {
if (LOG.isDebugEnabled()) {
- LOG.debug("The path [" + paths[i + start] + "] is being parked for HiveInputFormat.getSplits");
+ LOG.debug("The path [{}] is being parked for HiveInputFormat.getSplits", paths[i + start]);
}
nonCombinablePathIndices.add(i + start);
}
@@ -388,7 +388,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
Class inputFormatClass = part.getInputFileFormatClass();
String inputFormatClassName = inputFormatClass.getName();
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
- LOG.info("Input Format => " + inputFormatClass.getName());
+ LOG.info("Input Format => {}", inputFormatClass.getName());
// **MOD** Set the hoodie filter in the combine
if (inputFormatClass.getName().equals(HoodieParquetInputFormat.class.getName())) {
combine.setHoodieFilter(true);
@@ -428,11 +428,11 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
f = poolMap.get(combinePathInputFormat);
if (f == null) {
f = new CombineFilter(filterPath);
- LOG.info("CombineHiveInputSplit creating pool for " + path + "; using filter path " + filterPath);
+ LOG.info("CombineHiveInputSplit creating pool for {}; using filter path {}", path, filterPath);
combine.createPool(job, f);
poolMap.put(combinePathInputFormat, f);
} else {
- LOG.info("CombineHiveInputSplit: pool is already created for " + path + "; using filter path " + filterPath);
+ LOG.info("CombineHiveInputSplit: pool is already created for {}; using filter path {}", path, filterPath);
f.addPath(filterPath);
}
} else {
@@ -481,7 +481,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
result.add(csplit);
}
- LOG.info("number of splits " + result.size());
+ LOG.info("number of splits {}", result.size());
return result.toArray(new CombineHiveInputSplit[result.size()]);
}
@@ -491,8 +491,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
@VisibleForTesting
public Set<Integer> getNonCombinablePathIndices(JobConf job, Path[] paths, int numThreads)
throws ExecutionException, InterruptedException {
- LOG.info("Total number of paths: " + paths.length + ", launching " + numThreads
- + " threads to check non-combinable ones.");
+ LOG.info("Total number of paths: {}, launching {} threads to check non-combinable ones.", paths.length, numThreads);
int numPathPerThread = (int) Math.ceil((double) paths.length / numThreads);
ExecutorService executor = Executors.newFixedThreadPool(numThreads);
@@ -555,8 +554,8 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
// Store the previous value for the path specification
String oldPaths = job.get(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR);
if (LOG.isDebugEnabled()) {
- LOG.debug("The received input paths are: [" + oldPaths + "] against the property "
- + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR);
+ LOG.debug("The received input paths are: [{}] against the property {}", oldPaths,
+ org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR);
}
// Process the normal splits
@@ -589,7 +588,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
// clear work from ThreadLocal after splits generated in case of thread is reused in pool.
Utilities.clearWorkMapForConf(job);
- LOG.info("Number of all splits " + result.size());
+ LOG.info("Number of all splits {}", result.size());
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
return result.toArray(new InputSplit[result.size()]);
}
@@ -691,7 +690,7 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend
retLists.add(split);
long splitgLength = split.getLength();
if (size + splitgLength >= targetSize) {
- LOG.info("Sample alias " + entry.getValue() + " using " + (i + 1) + "splits");
+ LOG.info("Sample alias {} using {} splits", entry.getValue(), i + 1);
if (size + splitgLength > targetSize) {
((InputSplitShim) split).shrinkSplit(targetSize - size);
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
index d7b50d4..6c7bab1 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -45,11 +45,11 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -84,7 +84,7 @@ public abstract class AbstractRealtimeRecordReader {
// Default file path prefix for spillable file
public static final String DEFAULT_SPILLABLE_MAP_BASE_PATH = "/tmp/";
- private static final Logger LOG = LogManager.getLogger(AbstractRealtimeRecordReader.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractRealtimeRecordReader.class);
protected final HoodieRealtimeFileSplit split;
protected final JobConf jobConf;
@@ -98,12 +98,12 @@ public abstract class AbstractRealtimeRecordReader {
public AbstractRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job) {
this.split = split;
this.jobConf = job;
- LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
- LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
- LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""));
+ LOG.info("cfg ==> {}", job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
+ LOG.info("columnIds ==> {}", job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
+ LOG.info("partitioningColumns ==> {}", job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""));
try {
this.usesCustomPayload = usesCustomPayload();
- LOG.info("usesCustomPayload ==> " + this.usesCustomPayload);
+ LOG.info("usesCustomPayload ==> {}", this.usesCustomPayload);
baseFileSchema = readSchema(jobConf, split.getPath());
init();
} catch (IOException e) {
@@ -339,10 +339,10 @@ public abstract class AbstractRealtimeRecordReader {
LogReaderUtils.readLatestSchemaFromLogFiles(split.getBasePath(), split.getDeltaFilePaths(), jobConf);
if (schemaFromLogFile == null) {
writerSchema = new AvroSchemaConverter().convert(baseFileSchema);
- LOG.debug("Writer Schema From Parquet => " + writerSchema.getFields());
+ LOG.debug("Writer Schema From Parquet => {}", writerSchema.getFields());
} else {
writerSchema = schemaFromLogFile;
- LOG.debug("Writer Schema From Log => " + writerSchema.getFields());
+ LOG.debug("Writer Schema From Log => {}", writerSchema.getFields());
}
// Add partitioning fields to writer schema for resulting row to contain null values for these fields
String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
@@ -359,8 +359,8 @@ public abstract class AbstractRealtimeRecordReader {
// to null out fields not present before
readerSchema = generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields);
- LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s",
- split.getDeltaFilePaths(), split.getPath(), projectionFields));
+ LOG.info("About to read compacted logs {} for base split {}, projecting cols {}",
+ split.getDeltaFilePaths(), split.getPath(), projectionFields);
}
private Schema constructHiveOrderedSchema(Schema writerSchema, Map<String, Field> schemaFieldsMap) {
@@ -377,7 +377,7 @@ public abstract class AbstractRealtimeRecordReader {
} else {
// Hive has some extra virtual columns like BLOCK__OFFSET__INSIDE__FILE which do not exist in table schema.
// They will get skipped as they won't be found in the original schema.
- LOG.debug("Skipping Hive Column => " + columnName);
+ LOG.debug("Skipping Hive Column => {}", columnName);
}
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
index f62f288..0320257 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
@@ -46,8 +46,8 @@ import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -65,7 +65,7 @@ import java.util.stream.Stream;
@UseFileSplitsFromInputFormat
public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat implements Configurable {
- private static final Logger LOG = LogManager.getLogger(HoodieParquetRealtimeInputFormat.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HoodieParquetRealtimeInputFormat.class);
// These positions have to be deterministic across all tables
public static final int HOODIE_COMMIT_TIME_COL_POS = 0;
@@ -148,7 +148,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
throw new HoodieException("Error obtaining data file/log file grouping: " + partitionPath, e);
}
});
- LOG.info("Returning a total splits of " + rtSplits.size());
+ LOG.info("Returning a total splits of {}", rtSplits.size());
return rtSplits.toArray(new InputSplit[rtSplits.size()]);
}
@@ -180,9 +180,9 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, readColNamesPrefix + fieldName);
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, readColIdsPrefix + fieldIndex);
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("Adding extra column " + fieldName + ", to enable log merging cols (%s) ids (%s) ",
+ LOG.debug("Adding extra column {}, to enable log merging cols ({}) ids ({}) ", fieldName,
conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
- conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)));
+ conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
}
}
return conf;
@@ -210,7 +210,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
if (!columnIds.isEmpty() && columnIds.charAt(0) == ',') {
conf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, columnIds.substring(1));
if (LOG.isDebugEnabled()) {
- LOG.debug("The projection Ids: {" + columnIds + "} start with ','. First comma is removed");
+ LOG.debug("The projection Ids: [{}] start with ','. First comma is removed", columnIds);
}
}
return conf;
@@ -226,9 +226,9 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
// actual heavy lifting of reading the parquet files happen.
if (job.get(HOODIE_READ_COLUMNS_PROP) == null) {
synchronized (job) {
- LOG.info(
- "Before adding Hoodie columns, Projections :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
- + ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
+ LOG.info("Before adding Hoodie columns, Projections : {}, Ids : {}",
+ job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
+ job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
if (job.get(HOODIE_READ_COLUMNS_PROP) == null) {
// Hive (across all versions) fails for queries like select count(`_hoodie_commit_time`) from table;
// In this case, the projection fields gets removed. Looking at HiveInputFormat implementation, in some cases
@@ -244,7 +244,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat i
}
}
- LOG.info("Creating record reader with readCols :" + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ LOG.info("Creating record reader with readCols : {}", job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
+ ", Ids :" + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
// sanity check
Preconditions.checkArgument(split instanceof HoodieRealtimeFileSplit,
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java
index cb8606e..bd02971 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeRecordReader.java
@@ -24,8 +24,8 @@ import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
@@ -39,7 +39,7 @@ public class HoodieRealtimeRecordReader implements RecordReader<NullWritable, Ar
public static final String REALTIME_SKIP_MERGE_PROP = "hoodie.realtime.merge.skip";
// By default, we do merged-reading
public static final String DEFAULT_REALTIME_SKIP_MERGE = "false";
- private static final Logger LOG = LogManager.getLogger(HoodieRealtimeRecordReader.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HoodieRealtimeRecordReader.class);
private final RecordReader<NullWritable, ArrayWritable> reader;
public HoodieRealtimeRecordReader(HoodieRealtimeFileSplit split, JobConf job,
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index deeaaf4..5ad59af 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -31,8 +31,8 @@ import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
@@ -40,7 +40,7 @@ import java.util.Map;
class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
implements RecordReader<NullWritable, ArrayWritable> {
- private static final Logger LOG = LogManager.getLogger(AbstractRealtimeRecordReader.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractRealtimeRecordReader.class);
protected final RecordReader<NullWritable, ArrayWritable> parquetReader;
private final Map<String, HoodieRecord<? extends HoodieRecordPayload>> deltaRecordMap;
@@ -108,8 +108,8 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
ArrayWritable aWritable = (ArrayWritable) avroToArrayWritable(recordToReturn, getHiveSchema());
Writable[] replaceValue = aWritable.get();
if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("key %s, base values: %s, log values: %s", key, arrayWritableToString(arrayWritable),
- arrayWritableToString(aWritable)));
+ LOG.debug("key {}, base values: {}, log values: {}", key, arrayWritableToString(arrayWritable),
+ arrayWritableToString(aWritable));
}
Writable[] originalValue = arrayWritable.get();
try {
@@ -117,8 +117,8 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
arrayWritable.set(originalValue);
} catch (RuntimeException re) {
LOG.error("Got exception when doing array copy", re);
- LOG.error("Base record :" + arrayWritableToString(arrayWritable));
- LOG.error("Log record :" + arrayWritableToString(aWritable));
+ LOG.error("Base record : {}", arrayWritableToString(arrayWritable));
+ LOG.error("Log record : {}", arrayWritableToString(aWritable));
String errMsg = "Base-record :" + arrayWritableToString(arrayWritable)
+ " ,Log-record :" + arrayWritableToString(aWritable) + " ,Error :" + re.getMessage();
throw new RuntimeException(errMsg, re);