You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2020/01/06 14:47:12 UTC
[incubator-hudi] branch redo-log updated: [HUDI-463] Redo
hudi-utilities log statements using SLF4J (#1177)
This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch redo-log
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/redo-log by this push:
new b98e931 [HUDI-463] Redo hudi-utilities log statements using SLF4J (#1177)
b98e931 is described below
commit b98e93179b33e4d837ee0fb41b4fa59c6e1c0e76
Author: ForwardXu <fo...@gmail.com>
AuthorDate: Mon Jan 6 22:47:01 2020 +0800
[HUDI-463] Redo hudi-utilities log statements using SLF4J (#1177)
---
hudi-utilities/pom.xml | 5 +++
.../apache/hudi/utilities/HDFSParquetImporter.java | 15 +++----
.../hudi/utilities/HiveIncrementalPuller.java | 47 +++++++++++-----------
.../org/apache/hudi/utilities/HoodieCleaner.java | 8 ++--
.../org/apache/hudi/utilities/HoodieCompactor.java | 8 ++--
.../hudi/utilities/HoodieSnapshotCopier.java | 6 +--
.../org/apache/hudi/utilities/UtilHelpers.java | 8 ++--
.../adhoc/UpgradePayloadFromUberToApache.java | 32 ++++++++-------
.../AbstractDeltaStreamerService.java | 7 ++--
.../hudi/utilities/deltastreamer/Compactor.java | 11 ++---
.../hudi/utilities/deltastreamer/DeltaSync.java | 41 ++++++++++---------
.../deltastreamer/HoodieDeltaStreamer.java | 23 ++++++-----
.../deltastreamer/SchedulerConfGenerator.java | 8 ++--
.../hudi/utilities/perf/TimelineServerPerf.java | 8 ++--
.../hudi/utilities/sources/AvroKafkaSource.java | 8 ++--
.../hudi/utilities/sources/HiveIncrPullSource.java | 8 ++--
.../hudi/utilities/sources/HoodieIncrSource.java | 8 ++--
.../hudi/utilities/sources/JsonKafkaSource.java | 8 ++--
.../org/apache/hudi/utilities/sources/Source.java | 6 +--
.../utilities/sources/helpers/KafkaOffsetGen.java | 5 ---
.../utilities/transform/FlatteningTransformer.java | 8 ++--
.../transform/SqlQueryBasedTransformer.java | 10 ++---
.../hudi/utilities/TestHoodieDeltaStreamer.java | 12 +++---
.../utilities/sources/AbstractBaseTestSource.java | 6 +--
.../sources/DistributedTestDataSource.java | 6 +--
.../hudi/utilities/sources/TestDataSource.java | 6 +--
26 files changed, 165 insertions(+), 153 deletions(-)
diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml
index ba59e22..00c6ecb 100644
--- a/hudi-utilities/pom.xml
+++ b/hudi-utilities/pom.xml
@@ -124,6 +124,11 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
<!-- Fasterxml -->
<dependency>
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
index 4aa72d0..8b141a2 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java
@@ -42,8 +42,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.spark.api.java.JavaRDD;
@@ -59,6 +57,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.Properties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Tuple2;
/**
@@ -66,7 +66,7 @@ import scala.Tuple2;
*/
public class HDFSParquetImporter implements Serializable {
- private static final Logger LOG = LogManager.getLogger(HDFSParquetImporter.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HDFSParquetImporter.class);
private static final DateTimeFormatter PARTITION_FORMATTER = DateTimeFormatter.ofPattern("yyyy/MM/dd")
.withZone(ZoneId.systemDefault());
@@ -103,7 +103,7 @@ public class HDFSParquetImporter implements Serializable {
this.fs = FSUtils.getFs(cfg.targetPath, jsc.hadoopConfiguration());
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
: UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
- LOG.info("Starting data import with configs : " + props.toString());
+ LOG.info("Starting data import with configs : {}", props.toString());
int ret = -1;
try {
// Verify that targetPath is not present.
@@ -114,7 +114,7 @@ public class HDFSParquetImporter implements Serializable {
ret = dataImport(jsc);
} while (ret != 0 && retry-- > 0);
} catch (Throwable t) {
- LOG.error(t);
+ LOG.error("The dataImport error:", t);
}
return ret;
}
@@ -175,13 +175,14 @@ public class HDFSParquetImporter implements Serializable {
throw new HoodieIOException("row field is missing. :" + cfg.rowKey);
}
String partitionPath = partitionField.toString();
- LOG.debug("Row Key : " + rowField + ", Partition Path is (" + partitionPath + ")");
+ LOG.debug("Row Key : {}, Partition Path is ({})", rowField, partitionPath);
if (partitionField instanceof Number) {
try {
long ts = (long) (Double.parseDouble(partitionField.toString()) * 1000L);
partitionPath = PARTITION_FORMATTER.format(Instant.ofEpochMilli(ts));
} catch (NumberFormatException nfe) {
- LOG.warn("Unable to parse date from partition field. Assuming partition as (" + partitionField + ")");
+ LOG.warn("Unable to parse date from partition field. Assuming partition as ({})",
+ partitionField);
}
}
return new HoodieRecord<>(new HoodieKey(rowField.toString(), partitionPath),
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
index 963bc7d..0a2ecde 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HiveIncrementalPuller.java
@@ -33,8 +33,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.stringtemplate.v4.ST;
import java.io.File;
@@ -49,6 +47,8 @@ import java.sql.Statement;
import java.util.List;
import java.util.Scanner;
import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Utility to pull data after a given commit, based on the supplied HiveQL and save the delta as another hive temporary
@@ -61,7 +61,7 @@ import java.util.stream.Collectors;
*/
public class HiveIncrementalPuller {
- private static final Logger LOG = LogManager.getLogger(HiveIncrementalPuller.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HiveIncrementalPuller.class);
private static String driverName = "org.apache.hive.jdbc.HiveDriver";
public static class Config implements Serializable {
@@ -129,10 +129,10 @@ public class HiveIncrementalPuller {
try {
if (config.fromCommitTime == null) {
config.fromCommitTime = inferCommitTime(fs);
- LOG.info("FromCommitTime inferred as " + config.fromCommitTime);
+ LOG.info("FromCommitTime inferred as {}", config.fromCommitTime);
}
- LOG.info("FromCommitTime - " + config.fromCommitTime);
+ LOG.info("FromCommitTime - {}", config.fromCommitTime);
String sourceTableLocation = getTableLocation(config.sourceDb, config.sourceTable);
String lastCommitTime = getLastCommitTimePulled(fs, sourceTableLocation);
if (lastCommitTime == null) {
@@ -180,15 +180,16 @@ public class HiveIncrementalPuller {
incrementalPullSQLtemplate.add("storedAsClause", storedAsClause);
String incrementalSQL = new Scanner(new File(config.incrementalSQLFile)).useDelimiter("\\Z").next();
if (!incrementalSQL.contains(config.sourceDb + "." + config.sourceTable)) {
- LOG.info("Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable
- + ", which means its pulling from a different table. Fencing this from happening.");
+ LOG.info(
+ "Incremental SQL does not have {}.{}, which means its pulling from a different table. Fencing this from happening.",
+ config.sourceDb, config.sourceTable);
throw new HoodieIncrementalPullSQLException(
"Incremental SQL does not have " + config.sourceDb + "." + config.sourceTable);
}
if (!incrementalSQL.contains("`_hoodie_commit_time` > '%targetBasePath'")) {
- LOG.info("Incremental SQL : " + incrementalSQL
- + " does not contain `_hoodie_commit_time` > '%targetBasePath'. Please add "
- + "this clause for incremental to work properly.");
+ LOG.info(
+ "Incremental SQL : {} does not contain `_hoodie_commit_time` > '%targetBasePath'. Please add "
+ + "this clause for incremental to work properly.", incrementalSQL);
throw new HoodieIncrementalPullSQLException(
"Incremental SQL does not have clause `_hoodie_commit_time` > '%targetBasePath', which "
+ "means its not pulling incrementally");
@@ -224,18 +225,18 @@ public class HiveIncrementalPuller {
}
private boolean deleteHDFSPath(FileSystem fs, String path) throws IOException {
- LOG.info("Deleting path " + path);
+ LOG.info("Deleting path {}", path);
return fs.delete(new Path(path), true);
}
private void executeStatement(String sql, Statement stmt) throws SQLException {
- LOG.info("Executing: " + sql);
+ LOG.info("Executing: {}", sql);
stmt.execute(sql);
}
private String inferCommitTime(FileSystem fs) throws SQLException, IOException {
- LOG.info("FromCommitTime not specified. Trying to infer it from Hoodie dataset " + config.targetDb + "."
- + config.targetTable);
+ LOG.info("FromCommitTime not specified. Trying to infer it from Hoodie dataset {}.{}",
+ config.targetDb, config.targetTable);
String targetDataLocation = getTableLocation(config.targetDb, config.targetTable);
return scanForCommitTime(fs, targetDataLocation);
}
@@ -249,7 +250,7 @@ public class HiveIncrementalPuller {
resultSet = stmt.executeQuery("describe formatted `" + db + "." + table + "`");
while (resultSet.next()) {
if (resultSet.getString(1).trim().equals("Location:")) {
- LOG.info("Inferred table location for " + db + "." + table + " as " + resultSet.getString(2));
+ LOG.info("Inferred table location for {}.{} as {}", db, table, resultSet.getString(2));
return resultSet.getString(2);
}
}
@@ -290,7 +291,7 @@ public class HiveIncrementalPuller {
private boolean ensureTempPathExists(FileSystem fs, String lastCommitTime) throws IOException {
Path targetBaseDirPath = new Path(config.hoodieTmpDir, config.targetTable + "__" + config.sourceTable);
if (!fs.exists(targetBaseDirPath)) {
- LOG.info("Creating " + targetBaseDirPath + " with permission drwxrwxrwx");
+ LOG.info("Creating {} with permission drwxrwxrwx", targetBaseDirPath);
boolean result =
FileSystem.mkdirs(fs, targetBaseDirPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
if (!result) {
@@ -305,7 +306,7 @@ public class HiveIncrementalPuller {
throw new HoodieException("Could not delete existing " + targetPath);
}
}
- LOG.info("Creating " + targetPath + " with permission drwxrwxrwx");
+ LOG.info("Creating {} with permission drwxrwxrwx", targetPath);
return FileSystem.mkdirs(fs, targetBaseDirPath, new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
}
@@ -316,19 +317,19 @@ public class HiveIncrementalPuller {
.collect(Collectors.toList());
if (commitsToSync.isEmpty()) {
LOG.warn(
- "Nothing to sync. All commits in "
- + config.sourceTable + " are " + metadata.getActiveTimeline().getCommitsTimeline()
- .filterCompletedInstants().getInstants().collect(Collectors.toList())
- + " and from commit time is " + config.fromCommitTime);
+ "Nothing to sync. All commits in {} are {} and from commit time is {}",
+ config.sourceTable, metadata.getActiveTimeline().getCommitsTimeline()
+ .filterCompletedInstants().getInstants().collect(Collectors.toList()),
+ config.fromCommitTime, config.sourceTable);
return null;
}
- LOG.info("Syncing commits " + commitsToSync);
+ LOG.info("Syncing commits {}", commitsToSync);
return commitsToSync.get(commitsToSync.size() - 1);
}
private Connection getConnection() throws SQLException {
if (connection == null) {
- LOG.info("Getting Hive Connection to " + config.hiveJDBCUrl);
+ LOG.info("Getting Hive Connection to {}", config.hiveJDBCUrl);
this.connection = DriverManager.getConnection(config.hiveJDBCUrl, config.hiveUsername, config.hivePassword);
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java
index 9185d97..d21e4c6 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCleaner.java
@@ -27,18 +27,18 @@ import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class HoodieCleaner {
- private static final Logger LOG = LogManager.getLogger(HoodieCleaner.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HoodieCleaner.class);
/**
* Config for Cleaner.
@@ -66,7 +66,7 @@ public class HoodieCleaner {
this.fs = FSUtils.getFs(cfg.basePath, jssc.hadoopConfiguration());
this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs)
: UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
- LOG.info("Creating Cleaner with configs : " + props.toString());
+ LOG.info("Creating Cleaner with configs : {}", props.toString());
}
public void run() throws Exception {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
index 4ace07c..31a6ab4 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java
@@ -28,18 +28,18 @@ import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class HoodieCompactor {
- private static final Logger LOG = LogManager.getLogger(HoodieCompactor.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HoodieCompactor.class);
private final Config cfg;
private transient FileSystem fs;
private TypedProperties props;
@@ -110,7 +110,7 @@ public class HoodieCompactor {
}
} while (ret != 0 && retry-- > 0);
} catch (Throwable t) {
- LOG.error(t);
+ LOG.error("The compact error:", t);
}
return ret;
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
index d24319e..2d64029 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieSnapshotCopier.java
@@ -36,8 +36,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
@@ -47,6 +45,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.Tuple2;
/**
@@ -54,7 +54,7 @@ import scala.Tuple2;
*/
public class HoodieSnapshotCopier implements Serializable {
- private static final Logger LOG = LogManager.getLogger(HoodieSnapshotCopier.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HoodieSnapshotCopier.class);
static class Config implements Serializable {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 4cb56e9..65b8e8f 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -37,8 +37,6 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
@@ -53,12 +51,14 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Bunch of helper methods.
*/
public class UtilHelpers {
- private static final Logger LOG = LogManager.getLogger(UtilHelpers.class);
+ private static final Logger LOG = LoggerFactory.getLogger(UtilHelpers.class);
public static Source createSource(String sourceClass, TypedProperties cfg, JavaSparkContext jssc,
SparkSession sparkSession, SchemaProvider schemaProvider) throws IOException {
@@ -97,7 +97,7 @@ public class UtilHelpers {
conf = new DFSPropertiesConfiguration(cfgPath.getFileSystem(fs.getConf()), cfgPath);
} catch (Exception e) {
conf = new DFSPropertiesConfiguration();
- LOG.warn("Unexpected error read props file at :" + cfgPath, e);
+ LOG.warn("Unexpected error read props file at :{}", cfgPath, e);
}
try {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/adhoc/UpgradePayloadFromUberToApache.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/adhoc/UpgradePayloadFromUberToApache.java
index 6040437..8a6f75a 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/adhoc/UpgradePayloadFromUberToApache.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/adhoc/UpgradePayloadFromUberToApache.java
@@ -28,8 +28,6 @@ import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import java.io.BufferedReader;
import java.io.FileReader;
@@ -38,15 +36,18 @@ import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * This is an one-time use class meant for migrating the configuration for "hoodie.compaction.payload.class" in
- * .hoodie/hoodie.properties from com.uber.hoodie to org.apache.hudi . It takes in a file containing base-paths for a set
- * of hudi datasets and does the migration
+ * This is an one-time use class meant for migrating the configuration for
+ * "hoodie.compaction.payload.class" in .hoodie/hoodie.properties from com.uber.hoodie to
+ * org.apache.hudi . It takes in a file containing base-paths for a set of hudi datasets and does
+ * the migration.
*/
public class UpgradePayloadFromUberToApache implements Serializable {
- private static final Logger LOG = LogManager.getLogger(UpgradePayloadFromUberToApache.class);
+ private static final Logger LOG = LoggerFactory.getLogger(UpgradePayloadFromUberToApache.class);
private final Config cfg;
@@ -59,36 +60,37 @@ public class UpgradePayloadFromUberToApache implements Serializable {
try (BufferedReader reader = new BufferedReader(new FileReader(cfg.inputPath))) {
basePath = reader.readLine();
} catch (IOException e) {
- LOG.error("Read from path: " + cfg.inputPath + " error.", e);
+ LOG.error("Read from path: {} error.", cfg.inputPath, e);
}
while (basePath != null) {
basePath = basePath.trim();
if (!basePath.startsWith("#")) {
- LOG.info("Performing upgrade for " + basePath);
+ LOG.info("Performing upgrade for {}", basePath);
String metaPath = String.format("%s/.hoodie", basePath);
HoodieTableMetaClient metaClient =
- new HoodieTableMetaClient(FSUtils.prepareHadoopConf(new Configuration()), basePath, false);
+ new HoodieTableMetaClient(FSUtils.prepareHadoopConf(new Configuration()), basePath,
+ false);
HoodieTableConfig tableConfig = metaClient.getTableConfig();
if (tableConfig.getTableType().equals(HoodieTableType.MERGE_ON_READ)) {
Map<String, String> propsMap = tableConfig.getProps();
if (propsMap.containsKey(HoodieCompactionConfig.PAYLOAD_CLASS_PROP)) {
String payloadClass = propsMap.get(HoodieCompactionConfig.PAYLOAD_CLASS_PROP);
- LOG.info("Found payload class=" + payloadClass);
+ LOG.info("Found payload class={}", payloadClass);
if (payloadClass.startsWith("com.uber.hoodie")) {
String newPayloadClass = payloadClass.replace("com.uber.hoodie", "org.apache.hudi");
- LOG.info("Replacing payload class (" + payloadClass + ") with (" + newPayloadClass + ")");
+ LOG.info("Replacing payload class ({}) with ({})", payloadClass, newPayloadClass);
Map<String, String> newPropsMap = new HashMap<>(propsMap);
newPropsMap.put(HoodieCompactionConfig.PAYLOAD_CLASS_PROP, newPayloadClass);
Properties props = new Properties();
props.putAll(newPropsMap);
- HoodieTableConfig.createHoodieProperties(metaClient.getFs(), new Path(metaPath), props);
- LOG.info("Finished upgrade for " + basePath);
+ HoodieTableConfig
+ .createHoodieProperties(metaClient.getFs(), new Path(metaPath), props);
+ LOG.info("Finished upgrade for {}", basePath);
}
}
} else {
- LOG.info("Skipping as this table is COW table. BasePath=" + basePath);
-
+ LOG.info("Skipping as this table is COW table. BasePath={}", basePath);
}
}
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/AbstractDeltaStreamerService.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/AbstractDeltaStreamerService.java
index 5d36e8d..8fd62e7 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/AbstractDeltaStreamerService.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/AbstractDeltaStreamerService.java
@@ -20,9 +20,6 @@ package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.common.util.collection.Pair;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-
import java.io.Serializable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -30,13 +27,15 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Base Class for running delta-sync/compaction in separate thread and controlling their life-cyle.
*/
public abstract class AbstractDeltaStreamerService implements Serializable {
- private static final Logger LOG = LogManager.getLogger(AbstractDeltaStreamerService.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractDeltaStreamerService.class);
// Flag to track if the service is started.
private boolean started;
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java
index eb3212f..65bf598 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java
@@ -24,20 +24,20 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
import java.io.Serializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Run one round of compaction.
*/
public class Compactor implements Serializable {
- private static final Logger LOG = LogManager.getLogger(Compactor.class);
+ private static final Logger LOG = LoggerFactory.getLogger(Compactor.class);
private transient HoodieWriteClient compactionClient;
private transient JavaSparkContext jssc;
@@ -48,12 +48,13 @@ public class Compactor implements Serializable {
}
public void compact(HoodieInstant instant) throws IOException {
- LOG.info("Compactor executing compaction " + instant);
+ LOG.info("Compactor executing compaction {}", instant);
JavaRDD<WriteStatus> res = compactionClient.compact(instant.getTimestamp());
long numWriteErrors = res.collect().stream().filter(r -> r.hasErrors()).count();
if (numWriteErrors != 0) {
// We treat even a single error in compaction as fatal
- LOG.error("Compaction for instant (" + instant + ") failed with write errors. Errors :" + numWriteErrors);
+ LOG.error("Compaction for instant ({}) failed with write errors. Errors :{}", instant,
+ numWriteErrors);
throw new HoodieException(
"Compaction for instant (" + instant + ") failed with write errors. Errors :" + numWriteErrors);
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 7dfb015..20608b7 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -56,8 +56,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
@@ -73,6 +71,8 @@ import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;
import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
@@ -83,7 +83,7 @@ import static org.apache.hudi.utilities.schema.RowBasedSchemaProvider.HOODIE_REC
*/
public class DeltaSync implements Serializable {
- private static final Logger LOG = LogManager.getLogger(DeltaSync.class);
+ private static final Logger LOG = LoggerFactory.getLogger(DeltaSync.class);
public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
public static String CHECKPOINT_RESET_KEY = "deltastreamer.checkpoint.reset_key";
@@ -168,7 +168,7 @@ public class DeltaSync implements Serializable {
this.tableType = tableType;
this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
this.props = props;
- LOG.info("Creating delta streamer with configs : " + props.toString());
+ LOG.info("Creating delta streamer with configs : {}", props.toString());
this.schemaProvider = schemaProvider;
refreshTimeline();
@@ -266,7 +266,7 @@ public class DeltaSync implements Serializable {
if (!resumeCheckpointStr.isPresent() && cfg.checkpoint != null) {
resumeCheckpointStr = Option.of(cfg.checkpoint);
}
- LOG.info("Checkpoint to resume from : " + resumeCheckpointStr);
+ LOG.info("Checkpoint to resume from : {}", resumeCheckpointStr);
final Option<JavaRDD<GenericRecord>> avroRDDOptional;
final String checkpointStr;
@@ -300,8 +300,9 @@ public class DeltaSync implements Serializable {
}
if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) {
- LOG.info("No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=("
- + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")");
+ LOG.info(
+ "No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=({}). New Checkpoint=({})",
+ resumeCheckpointStr, checkpointStr);
return null;
}
@@ -342,7 +343,7 @@ public class DeltaSync implements Serializable {
boolean isEmpty = records.isEmpty();
String commitTime = startCommit();
- LOG.info("Starting commit : " + commitTime);
+ LOG.info("Starting commit : {}", commitTime);
JavaRDD<WriteStatus> writeStatusRDD;
if (cfg.operation == Operation.INSERT) {
@@ -367,13 +368,14 @@ public class DeltaSync implements Serializable {
}
if (hasErrors) {
- LOG.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total="
- + totalErrorRecords + "/" + totalRecords);
+ LOG.warn(
+ "Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total={}/{}",
+ totalErrorRecords, totalRecords);
}
boolean success = writeClient.commit(commitTime, writeStatusRDD, Option.of(checkpointCommitMetadata));
if (success) {
- LOG.info("Commit " + commitTime + " successful!");
+ LOG.info("Commit {} successful!", commitTime);
// Schedule compaction if needed
if (cfg.isAsyncCompactionEnabled()) {
@@ -387,16 +389,18 @@ public class DeltaSync implements Serializable {
hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0;
}
} else {
- LOG.info("Commit " + commitTime + " failed!");
+ LOG.info("Commit {} failed!", commitTime);
throw new HoodieException("Commit " + commitTime + " failed!");
}
} else {
- LOG.error("Delta Sync found errors when writing. Errors/Total=" + totalErrorRecords + "/" + totalRecords);
+ LOG.error("Delta Sync found errors when writing. Errors/Total={}/{}", totalErrorRecords,
+ totalRecords);
LOG.error("Printing out the top 100 errors");
writeStatusRDD.filter(ws -> ws.hasErrors()).take(100).forEach(ws -> {
- LOG.error("Global error :", ws.getGlobalError());
+ LOG.error("Global error :{}", ws.getGlobalError());
if (ws.getErrors().size() > 0) {
- ws.getErrors().entrySet().forEach(r -> LOG.trace("Error for key:" + r.getKey() + " is " + r.getValue()));
+ ws.getErrors().entrySet()
+ .forEach(r -> LOG.trace("Error for key:{} is {}", r.getKey(), r.getValue()));
}
});
// Rolling back instant
@@ -438,8 +442,9 @@ public class DeltaSync implements Serializable {
private void syncHive() throws ClassNotFoundException {
if (cfg.enableHiveSync) {
HiveSyncConfig hiveSyncConfig = DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath);
- LOG.info("Syncing target hoodie table with hive table(" + hiveSyncConfig.tableName + "). Hive metastore URL :"
- + hiveSyncConfig.jdbcUrl + ", basePath :" + cfg.targetBasePath);
+ LOG.info(
+ "Syncing target hoodie table with hive table({}). Hive metastore URL :{}, basePath :{}",
+ hiveSyncConfig.tableName, hiveSyncConfig.jdbcUrl, cfg.targetBasePath);
new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable();
}
@@ -503,7 +508,7 @@ public class DeltaSync implements Serializable {
schemas.add(schemaProvider.getTargetSchema());
}
- LOG.info("Registering Schema :" + schemas);
+ LOG.info("Registering Schema :{}", schemas);
jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList());
}
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index f8ddadb..3736e87 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -46,8 +46,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
@@ -66,6 +64,8 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply it to the target
@@ -78,7 +78,7 @@ import java.util.stream.IntStream;
*/
public class HoodieDeltaStreamer implements Serializable {
- private static final Logger LOG = LogManager.getLogger(HoodieDeltaStreamer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HoodieDeltaStreamer.class);
public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
@@ -131,7 +131,7 @@ public class HoodieDeltaStreamer implements Serializable {
}
private boolean onDeltaSyncShutdown(boolean error) {
- LOG.info("DeltaSync shutdown. Closing write client. Error?" + error);
+ LOG.info("DeltaSync shutdown. Closing write client. Error?{}", error);
deltaSyncService.close();
return true;
}
@@ -363,7 +363,7 @@ public class HoodieDeltaStreamer implements Serializable {
}
this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
- LOG.info("Creating delta streamer with configs : " + props.toString());
+ LOG.info("Creating delta streamer with configs : {}", props.toString());
this.schemaProvider = UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc);
if (cfg.filterDupes) {
@@ -385,7 +385,7 @@ public class HoodieDeltaStreamer implements Serializable {
boolean error = false;
if (cfg.isAsyncCompactionEnabled()) {
// set Scheduler Pool.
- LOG.info("Setting Spark Pool name for delta-sync to " + SchedulerConfGenerator.DELTASYNC_POOL_NAME);
+ LOG.info("Setting Spark Pool name for delta-sync to {}", SchedulerConfGenerator.DELTASYNC_POOL_NAME);
jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.DELTASYNC_POOL_NAME);
}
try {
@@ -394,15 +394,15 @@ public class HoodieDeltaStreamer implements Serializable {
long start = System.currentTimeMillis();
Option<String> scheduledCompactionInstant = deltaSync.syncOnce();
if (scheduledCompactionInstant.isPresent()) {
- LOG.info("Enqueuing new pending compaction instant (" + scheduledCompactionInstant + ")");
+ LOG.info("Enqueuing new pending compaction instant ({})", scheduledCompactionInstant);
asyncCompactService.enqueuePendingCompaction(new HoodieInstant(State.REQUESTED,
HoodieTimeline.COMPACTION_ACTION, scheduledCompactionInstant.get()));
asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions);
}
long toSleepMs = cfg.minSyncIntervalSeconds * 1000 - (System.currentTimeMillis() - start);
if (toSleepMs > 0) {
- LOG.info("Last sync ran less than min sync interval: " + cfg.minSyncIntervalSeconds + " s, sleep: "
- + toSleepMs + " ms.");
+ LOG.info("Last sync ran less than min sync interval: {} s, sleep: {} ms",
+ cfg.minSyncIntervalSeconds, toSleepMs);
Thread.sleep(toSleepMs);
}
} catch (Exception e) {
@@ -422,7 +422,7 @@ public class HoodieDeltaStreamer implements Serializable {
* Shutdown compactor as DeltaSync is shutdown.
*/
private void shutdownCompactor(boolean error) {
- LOG.info("Delta Sync shutdown. Error ?" + error);
+ LOG.info("Delta Sync shutdown. Error ?{}", error);
if (asyncCompactService != null) {
LOG.warn("Gracefully shutting down compactor");
asyncCompactService.shutdown(false);
@@ -561,7 +561,8 @@ public class HoodieDeltaStreamer implements Serializable {
IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
try {
// Set Compactor Pool Name for allowing users to prioritize compaction
- LOG.info("Setting Spark Pool name for compaction to " + SchedulerConfGenerator.COMPACT_POOL_NAME);
+ LOG.info("Setting Spark Pool name for compaction to {}",
+ SchedulerConfGenerator.COMPACT_POOL_NAME);
jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.COMPACT_POOL_NAME);
while (!isShutdownRequested()) {
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java
index 09c4da0..862d59d 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java
@@ -21,8 +21,6 @@ package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.util.Option;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import java.io.BufferedWriter;
@@ -32,6 +30,8 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Utility Class to generate Spark Scheduling allocation file. This kicks in only when user sets
@@ -39,7 +39,7 @@ import java.util.UUID;
*/
public class SchedulerConfGenerator {
- private static final Logger LOG = LogManager.getLogger(SchedulerConfGenerator.class);
+ private static final Logger LOG = LoggerFactory.getLogger(SchedulerConfGenerator.class);
public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync";
public static final String COMPACT_POOL_NAME = "hoodiecompact";
@@ -88,7 +88,7 @@ public class SchedulerConfGenerator {
BufferedWriter bw = new BufferedWriter(new FileWriter(tempConfigFile));
bw.write(generateConfig(deltaSyncWeight, compactionWeight, deltaSyncMinShare, compactionMinShare));
bw.close();
- LOG.info("Configs written to file" + tempConfigFile.getAbsolutePath());
+ LOG.info("Configs written to file {}", tempConfigFile.getAbsolutePath());
return tempConfigFile.getAbsolutePath();
}
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
index 1108f65..f8b6a6a 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/perf/TimelineServerPerf.java
@@ -37,8 +37,6 @@ import com.codahale.metrics.UniformReservoir;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
@@ -55,10 +53,12 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TimelineServerPerf implements Serializable {
- private static final Logger LOG = LogManager.getLogger(TimelineServerPerf.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TimelineServerPerf.class);
private final Config cfg;
private transient TimelineService timelineServer;
private final boolean useExternalTimelineServer;
@@ -73,7 +73,7 @@ public class TimelineServerPerf implements Serializable {
private void setHostAddrFromSparkConf(SparkConf sparkConf) {
String hostAddr = sparkConf.get("spark.driver.host", null);
if (hostAddr != null) {
- LOG.info("Overriding hostIp to (" + hostAddr + ") found in spark-conf. It was " + this.hostAddr);
+ LOG.info("Overriding hostIp to ({}) found in spark-conf. It was {}", hostAddr, this.hostAddr);
this.hostAddr = hostAddr;
} else {
LOG.warn("Unable to find driver bind address from spark config");
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
index 18ebff4..9a59333 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java
@@ -27,20 +27,20 @@ import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import io.confluent.kafka.serializers.KafkaAvroDecoder;
import kafka.serializer.StringDecoder;
import org.apache.avro.generic.GenericRecord;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Reads avro serialized Kafka data, based on the confluent schema-registry.
*/
public class AvroKafkaSource extends AvroSource {
- private static final Logger LOG = LogManager.getLogger(AvroKafkaSource.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AvroKafkaSource.class);
private final KafkaOffsetGen offsetGen;
@@ -57,7 +57,7 @@ public class AvroKafkaSource extends AvroSource {
if (totalNewMsgs <= 0) {
return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
} else {
- LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
+ LOG.info("About to read {} from Kafka for topic :{}", totalNewMsgs, offsetGen.getTopicName());
}
JavaRDD<GenericRecord> newDataRDD = toRDD(offsetRanges);
return new InputBatch<>(Option.of(newDataRDD), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
index 666c260..5e546f3 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HiveIncrPullSource.java
@@ -33,8 +33,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -46,6 +44,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Source to read deltas produced by {@link HiveIncrementalPuller}, commit by commit and apply to the target table
@@ -59,7 +59,7 @@ import java.util.stream.Collectors;
*/
public class HiveIncrPullSource extends AvroSource {
- private static final Logger LOG = LogManager.getLogger(HiveIncrPullSource.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HiveIncrPullSource.class);
private final transient FileSystem fs;
@@ -95,7 +95,7 @@ public class HiveIncrPullSource extends AvroSource {
commitTimes.add(splits[splits.length - 1]);
}
Collections.sort(commitTimes);
- LOG.info("Retrieved commit times " + commitTimes);
+ LOG.info("Retrieved commit times {}", commitTimes);
if (!latestTargetCommit.isPresent()) {
// start from the beginning
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
index 888eec7..532104d 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java
@@ -28,8 +28,6 @@ import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
@@ -37,10 +35,12 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class HoodieIncrSource extends RowSource {
- private static final Logger LOG = LogManager.getLogger(HoodieIncrSource.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HoodieIncrSource.class);
protected static class Config {
@@ -109,7 +109,7 @@ public class HoodieIncrSource extends RowSource {
numInstantsPerFetch, beginInstant, readLatestOnMissingCkpt);
if (instantEndpts.getKey().equals(instantEndpts.getValue())) {
- LOG.warn("Already caught up. Begin Checkpoint was :" + instantEndpts.getKey());
+ LOG.warn("Already caught up. Begin Checkpoint was :{}", instantEndpts.getKey());
return Pair.of(Option.empty(), instantEndpts.getKey());
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index bd922ac..a146264 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -25,20 +25,20 @@ import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen;
import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
import kafka.serializer.StringDecoder;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.kafka.OffsetRange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Read json kafka data.
*/
public class JsonKafkaSource extends JsonSource {
- private static final Logger LOG = LogManager.getLogger(JsonKafkaSource.class);
+ private static final Logger LOG = LoggerFactory.getLogger(JsonKafkaSource.class);
private final KafkaOffsetGen offsetGen;
@@ -55,7 +55,7 @@ public class JsonKafkaSource extends JsonSource {
if (totalNewMsgs <= 0) {
return new InputBatch<>(Option.empty(), lastCheckpointStr.isPresent() ? lastCheckpointStr.get() : "");
}
- LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" + offsetGen.getTopicName());
+ LOG.info("About to read {} from Kafka for topic :{}", totalNewMsgs, offsetGen.getTopicName());
JavaRDD<String> newDataRDD = toRDD(offsetRanges);
return new InputBatch<>(Option.of(newDataRDD), CheckpointUtils.offsetsToStr(offsetRanges));
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
index 0760c73..d9d3299 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
@@ -22,18 +22,18 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.utilities.schema.SchemaProvider;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import java.io.Serializable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Represents a source from which we can tail data. Assumes a constructor that takes properties.
*/
public abstract class Source<T> implements Serializable {
- private static final Logger LOG = LogManager.getLogger(Source.class);
+ private static final Logger LOG = LoggerFactory.getLogger(Source.class);
public enum SourceType {
JSON, AVRO, ROW
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index c17a5cf..3810e94 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -25,8 +25,6 @@ import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
import kafka.common.TopicAndPartition;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.streaming.kafka.KafkaCluster;
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset;
import org.apache.spark.streaming.kafka.OffsetRange;
@@ -50,9 +48,6 @@ import scala.util.Either;
* Source to read data from Kafka, incrementally.
*/
public class KafkaOffsetGen {
-
- private static final Logger LOG = LogManager.getLogger(KafkaOffsetGen.class);
-
public static class CheckpointUtils {
/**
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
index aabcb73..9f530e2 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/FlatteningTransformer.java
@@ -20,8 +20,6 @@ package org.apache.hudi.utilities.transform;
import org.apache.hudi.common.util.TypedProperties;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -30,6 +28,8 @@ import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.UUID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Transformer that can flatten nested objects. It currently doesn't unnest arrays.
@@ -37,7 +37,7 @@ import java.util.UUID;
public class FlatteningTransformer implements Transformer {
private static final String TMP_TABLE = "HUDI_SRC_TMP_TABLE_";
- private static final Logger LOG = LogManager.getLogger(SqlQueryBasedTransformer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(SqlQueryBasedTransformer.class);
/**
* Configs supported.
@@ -48,7 +48,7 @@ public class FlatteningTransformer implements Transformer {
// tmp table name doesn't like dashes
String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
- LOG.info("Registering tmp table : " + tmpTable);
+ LOG.info("Registering tmp table : {}", tmpTable);
rowDataset.registerTempTable(tmpTable);
return sparkSession.sql("select " + flattenSchema(rowDataset.schema(), null) + " from " + tmpTable);
}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
index 8210fb1..1db5fd0 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/SqlQueryBasedTransformer.java
@@ -20,14 +20,14 @@ package org.apache.hudi.utilities.transform;
import org.apache.hudi.common.util.TypedProperties;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.util.UUID;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A transformer that allows a sql-query template be used to transform the source before writing to Hudi data-set.
@@ -36,7 +36,7 @@ import java.util.UUID;
*/
public class SqlQueryBasedTransformer implements Transformer {
- private static final Logger LOG = LogManager.getLogger(SqlQueryBasedTransformer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(SqlQueryBasedTransformer.class);
private static final String SRC_PATTERN = "<SRC>";
private static final String TMP_TABLE = "HOODIE_SRC_TMP_TABLE_";
@@ -59,10 +59,10 @@ public class SqlQueryBasedTransformer implements Transformer {
// tmp table name doesn't like dashes
String tmpTable = TMP_TABLE.concat(UUID.randomUUID().toString().replace("-", "_"));
- LOG.info("Registering tmp table : " + tmpTable);
+ LOG.info("Registering tmp table : {}", tmpTable);
rowDataset.registerTempTable(tmpTable);
String sqlStr = transformerSQL.replaceAll(SRC_PATTERN, tmpTable);
- LOG.info("SQL Query for transformation : (" + sqlStr + ")");
+ LOG.info("SQL Query for transformation : ({})", sqlStr);
return sparkSession.sql(sqlStr);
}
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
index c5f6c76..a42b5d6 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
@@ -50,8 +50,6 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
@@ -77,6 +75,8 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -89,7 +89,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
private static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
private static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
- private static final Logger LOG = LogManager.getLogger(TestHoodieDeltaStreamer.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TestHoodieDeltaStreamer.class);
@BeforeClass
public static void initClass() throws Exception {
@@ -247,7 +247,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
static void assertAtleastNCompactionCommits(int minExpected, String datasetPath, FileSystem fs) {
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath);
HoodieTimeline timeline = meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
- LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
+ LOG.info("Timeline Instants={}",
+ meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numCompactionCommits = (int) timeline.getInstants().count();
assertTrue("Got=" + numCompactionCommits + ", exp >=" + minExpected, minExpected <= numCompactionCommits);
}
@@ -255,7 +256,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
static void assertAtleastNDeltaCommits(int minExpected, String datasetPath, FileSystem fs) {
HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), datasetPath);
HoodieTimeline timeline = meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
- LOG.info("Timeline Instants=" + meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
+ LOG.info("Timeline Instants={}",
+ meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
int numDeltaCommits = (int) timeline.getInstants().count();
assertTrue("Got=" + numDeltaCommits + ", exp >=" + minExpected, minExpected <= numDeltaCommits);
}
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
index 745b0f0..941ddeb 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
@@ -29,8 +29,6 @@ import org.apache.hudi.utilities.sources.config.TestSourceConfig;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
@@ -39,10 +37,12 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class AbstractBaseTestSource extends AvroSource {
- private static final Logger LOG = LogManager.getLogger(AbstractBaseTestSource.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractBaseTestSource.class);
static final int DEFAULT_PARTITION_NUM = 0;
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/DistributedTestDataSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/DistributedTestDataSource.java
index 7153b2e..3c43aa3 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/DistributedTestDataSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/DistributedTestDataSource.java
@@ -24,21 +24,21 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.config.TestSourceConfig;
import org.apache.avro.generic.GenericRecord;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A Test DataSource which scales test-data generation by using spark parallelism.
*/
public class DistributedTestDataSource extends AbstractBaseTestSource {
- private static final Logger LOG = LogManager.getLogger(DistributedTestDataSource.class);
+ private static final Logger LOG = LoggerFactory.getLogger(DistributedTestDataSource.class);
private final int numTestSourcePartitions;
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java
index 0b52db9..ae09474 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDataSource.java
@@ -23,21 +23,21 @@ import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.avro.generic.GenericRecord;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import java.util.List;
import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An implementation of {@link Source}, that emits test upserts.
*/
public class TestDataSource extends AbstractBaseTestSource {
- private static final Logger LOG = LogManager.getLogger(TestDataSource.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TestDataSource.class);
public TestDataSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {