You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2020/01/10 01:38:41 UTC
[incubator-hudi] branch redo-log updated: [HUDI-459] Redo hudi-hive
log statements using SLF4J (#1203)
This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch redo-log
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/redo-log by this push:
new ac105e6 [HUDI-459] Redo hudi-hive log statements using SLF4J (#1203)
ac105e6 is described below
commit ac105e6d9b6dcd75f8145af6ce03600af40180e0
Author: lamber-ken <la...@163.com>
AuthorDate: Fri Jan 10 09:38:34 2020 +0800
[HUDI-459] Redo hudi-hive log statements using SLF4J (#1203)
---
hudi-hive/pom.xml | 5 +++
.../java/org/apache/hudi/hive/HiveSyncTool.java | 30 +++++++--------
.../org/apache/hudi/hive/HoodieHiveClient.java | 44 +++++++++++-----------
.../java/org/apache/hudi/hive/util/SchemaUtil.java | 12 +++---
.../org/apache/hudi/hive/util/HiveTestService.java | 10 ++---
5 files changed, 53 insertions(+), 48 deletions(-)
diff --git a/hudi-hive/pom.xml b/hudi-hive/pom.xml
index c552b70..1ab2533 100644
--- a/hudi-hive/pom.xml
+++ b/hudi-hive/pom.xml
@@ -49,6 +49,11 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 6bcb697..4029096 100644
--- a/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ b/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -34,8 +34,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.parquet.schema.MessageType;
import java.util.List;
@@ -52,7 +52,7 @@ import java.util.stream.Collectors;
@SuppressWarnings("WeakerAccess")
public class HiveSyncTool {
- private static final Logger LOG = LogManager.getLogger(HiveSyncTool.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HiveSyncTool.class);
private final HoodieHiveClient hoodieHiveClient;
public static final String SUFFIX_REALTIME_TABLE = "_rt";
private final HiveSyncConfig cfg;
@@ -79,7 +79,7 @@ public class HiveSyncTool {
cfg.tableName = originalTableName;
break;
default:
- LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
+ LOG.error("Unknown table type {}", hoodieHiveClient.getTableType());
throw new InvalidDatasetException(hoodieHiveClient.getBasePath());
}
} catch (RuntimeException re) {
@@ -90,8 +90,8 @@ public class HiveSyncTool {
}
private void syncHoodieTable(boolean isRealTime) throws ClassNotFoundException {
- LOG.info("Trying to sync hoodie table " + cfg.tableName + " with base path " + hoodieHiveClient.getBasePath()
- + " of type " + hoodieHiveClient.getTableType());
+ LOG.info("Trying to sync hoodie table {} with base path {} of type {}",
+ cfg.tableName, hoodieHiveClient.getBasePath(), hoodieHiveClient.getTableType());
// Check if the necessary table exists
boolean tableExists = hoodieHiveClient.doesTableExist();
@@ -100,20 +100,20 @@ public class HiveSyncTool {
// Sync schema if needed
syncSchema(tableExists, isRealTime, schema);
- LOG.info("Schema sync complete. Syncing partitions for " + cfg.tableName);
+ LOG.info("Schema sync complete. Syncing partitions for {}", cfg.tableName);
// Get the last time we successfully synced partitions
Option<String> lastCommitTimeSynced = Option.empty();
if (tableExists) {
lastCommitTimeSynced = hoodieHiveClient.getLastCommitTimeSynced();
}
- LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null"));
+ LOG.info("Last commit time synced was found to be {}", lastCommitTimeSynced.orElse("null"));
List<String> writtenPartitionsSince = hoodieHiveClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
- LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size());
+ LOG.info("Storage partitions scan complete. Found {}", writtenPartitionsSince.size());
// Sync the partitions if needed
syncPartitions(writtenPartitionsSince);
hoodieHiveClient.updateLastCommitTimeSynced();
- LOG.info("Sync complete for " + cfg.tableName);
+ LOG.info("Sync complete for {}", cfg.tableName);
}
/**
@@ -126,7 +126,7 @@ public class HiveSyncTool {
private void syncSchema(boolean tableExists, boolean isRealTime, MessageType schema) throws ClassNotFoundException {
// Check and sync schema
if (!tableExists) {
- LOG.info("Table " + cfg.tableName + " is not found. Creating it");
+ LOG.info("Table {} is not found. Creating it", cfg.tableName);
if (!isRealTime) {
// TODO - RO Table for MOR only after major compaction (UnboundedCompaction is default
// for now)
@@ -150,10 +150,10 @@ public class HiveSyncTool {
Map<String, String> tableSchema = hoodieHiveClient.getTableSchema();
SchemaDifference schemaDiff = SchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields);
if (!schemaDiff.isEmpty()) {
- LOG.info("Schema difference found for " + cfg.tableName);
+ LOG.info("Schema difference found for {}", cfg.tableName);
hoodieHiveClient.updateTableDefinition(schema);
} else {
- LOG.info("No Schema difference for " + cfg.tableName);
+ LOG.info("No Schema difference for {}", cfg.tableName);
}
}
}
@@ -168,10 +168,10 @@ public class HiveSyncTool {
List<PartitionEvent> partitionEvents =
hoodieHiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
List<String> newPartitions = filterPartitions(partitionEvents, PartitionEventType.ADD);
- LOG.info("New Partitions " + newPartitions);
+ LOG.info("New Partitions {}", newPartitions);
hoodieHiveClient.addPartitionsToTable(newPartitions);
List<String> updatePartitions = filterPartitions(partitionEvents, PartitionEventType.UPDATE);
- LOG.info("Changed Partitions " + updatePartitions);
+ LOG.info("Changed Partitions {}", updatePartitions);
hoodieHiveClient.updatePartitionsToTable(updatePartitions);
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to sync partitions for table " + cfg.tableName, e);
diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index d176500..820e59b 100644
--- a/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -48,8 +48,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.jdbc.HiveDriver;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -87,7 +87,7 @@ public class HoodieHiveClient {
}
}
- private static final Logger LOG = LogManager.getLogger(HoodieHiveClient.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HoodieHiveClient.class);
private final HoodieTableMetaClient metaClient;
private final HoodieTableType tableType;
private final PartitionValueExtractor partitionValueExtractor;
@@ -108,7 +108,7 @@ public class HoodieHiveClient {
// Support both JDBC and metastore based implementations for backwards compatiblity. Future users should
// disable jdbc and depend on metastore client for all hive registrations
if (cfg.useJdbc) {
- LOG.info("Creating hive connection " + cfg.jdbcUrl);
+ LOG.info("Creating hive connection {}", cfg.jdbcUrl);
createHiveConnection();
}
try {
@@ -137,10 +137,10 @@ public class HoodieHiveClient {
*/
void addPartitionsToTable(List<String> partitionsToAdd) {
if (partitionsToAdd.isEmpty()) {
- LOG.info("No partitions to add for " + syncConfig.tableName);
+ LOG.info("No partitions to add for {}", syncConfig.tableName);
return;
}
- LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + syncConfig.tableName);
+ LOG.info("Adding partitions {} to table {}", partitionsToAdd.size(), syncConfig.tableName);
String sql = constructAddPartitions(partitionsToAdd);
updateHiveSQL(sql);
}
@@ -150,10 +150,10 @@ public class HoodieHiveClient {
*/
void updatePartitionsToTable(List<String> changedPartitions) {
if (changedPartitions.isEmpty()) {
- LOG.info("No partitions to change for " + syncConfig.tableName);
+ LOG.info("No partitions to change for {}", syncConfig.tableName);
return;
}
- LOG.info("Changing partitions " + changedPartitions.size() + " on " + syncConfig.tableName);
+ LOG.info("Changing partitions {} on {}", changedPartitions.size(), syncConfig.tableName);
List<String> sqls = constructChangePartitions(changedPartitions);
for (String sql : sqls) {
updateHiveSQL(sql);
@@ -260,7 +260,7 @@ public class HoodieHiveClient {
.append(HIVE_ESCAPE_CHARACTER).append(syncConfig.tableName)
.append(HIVE_ESCAPE_CHARACTER).append(" REPLACE COLUMNS(")
.append(newSchemaStr).append(" )").append(cascadeClause);
- LOG.info("Updating table definition with " + sqlBuilder);
+ LOG.info("Updating table definition with {}", sqlBuilder);
updateHiveSQL(sqlBuilder.toString());
} catch (IOException e) {
throw new HoodieHiveSyncException("Failed to update table for " + syncConfig.tableName, e);
@@ -271,7 +271,7 @@ public class HoodieHiveClient {
try {
String createSQLQuery =
SchemaUtil.generateCreateDDL(storageSchema, syncConfig, inputFormatClass, outputFormatClass, serdeClass);
- LOG.info("Creating table with " + createSQLQuery);
+ LOG.info("Creating table with {}", createSQLQuery);
updateHiveSQL(createSQLQuery);
} catch (IOException e) {
throw new HoodieHiveSyncException("Failed to create table " + syncConfig.tableName, e);
@@ -329,7 +329,7 @@ public class HoodieHiveClient {
schema.putAll(columnsMap);
schema.putAll(partitionKeysMap);
final long end = System.currentTimeMillis();
- LOG.info(String.format("Time taken to getTableSchema: %s ms", (end - start)));
+ LOG.info("Time taken to getTableSchema: {} ms", (end - start));
return schema;
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to get table schema for : " + syncConfig.tableName, e);
@@ -364,7 +364,7 @@ public class HoodieHiveClient {
// Get a datafile written and get the schema from that file
Option<HoodieInstant> lastCompactionCommit =
metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
- LOG.info("Found the last compaction commit as " + lastCompactionCommit);
+ LOG.info("Found the last compaction commit as {}", lastCompactionCommit);
Option<HoodieInstant> lastDeltaCommit;
if (lastCompactionCommit.isPresent()) {
@@ -374,7 +374,7 @@ public class HoodieHiveClient {
lastDeltaCommit =
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
}
- LOG.info("Found the last delta commit " + lastDeltaCommit);
+ LOG.info("Found the last delta commit {}", lastDeltaCommit);
if (lastDeltaCommit.isPresent()) {
HoodieInstant lastDeltaInstant = lastDeltaCommit.get();
@@ -407,7 +407,7 @@ public class HoodieHiveClient {
return readSchemaFromLastCompaction(lastCompactionCommit);
}
default:
- LOG.error("Unknown table type " + tableType);
+ LOG.error("Unknown table type {}", tableType);
throw new InvalidDatasetException(syncConfig.basePath);
}
} catch (IOException e) {
@@ -441,7 +441,7 @@ public class HoodieHiveClient {
MessageType messageType = SchemaUtil.readSchemaFromLogFile(fs, path);
// Fall back to read the schema from last compaction
if (messageType == null) {
- LOG.info("Falling back to read the schema from last compaction " + lastCompactionCommitOpt);
+ LOG.info("Falling back to read the schema from last compaction {}", lastCompactionCommitOpt);
return readSchemaFromLastCompaction(lastCompactionCommitOpt);
}
return messageType;
@@ -451,7 +451,7 @@ public class HoodieHiveClient {
* Read the parquet schema from a parquet File.
*/
private MessageType readSchemaFromDataFile(Path parquetFilePath) throws IOException {
- LOG.info("Reading schema from " + parquetFilePath);
+ LOG.info("Reading schema from {}", parquetFilePath);
if (!fs.exists(parquetFilePath)) {
throw new IllegalArgumentException(
"Failed to read schema from data file " + parquetFilePath + ". File does not exist.");
@@ -482,7 +482,7 @@ public class HoodieHiveClient {
Statement stmt = null;
try {
stmt = connection.createStatement();
- LOG.info("Executing SQL " + s);
+ LOG.info("Executing SQL {}", s);
stmt.execute(s);
} catch (SQLException e) {
throw new HoodieHiveSyncException("Failed in executing SQL " + s, e);
@@ -513,12 +513,12 @@ public class HoodieHiveClient {
ss = SessionState.start(configuration);
hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration);
final long endTime = System.currentTimeMillis();
- LOG.info(String.format("Time taken to start SessionState and create Driver: %s ms", (endTime - startTime)));
+ LOG.info("Time taken to start SessionState and create Driver: {} ms", (endTime - startTime));
for (String sql : sqls) {
final long start = System.currentTimeMillis();
responses.add(hiveDriver.run(sql));
final long end = System.currentTimeMillis();
- LOG.info(String.format("Time taken to execute [%s]: %s ms", sql, (end - start)));
+ LOG.info("Time taken to execute [{}]: {} ms", sql, (end - start));
}
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed in executing SQL", e);
@@ -552,7 +552,7 @@ public class HoodieHiveClient {
try {
this.connection = DriverManager.getConnection(syncConfig.jdbcUrl, syncConfig.hiveUser, syncConfig.hivePass);
- LOG.info("Successfully established Hive connection to " + syncConfig.jdbcUrl);
+ LOG.info("Successfully established Hive connection to {}", syncConfig.jdbcUrl);
} catch (SQLException e) {
throw new HoodieHiveSyncException("Cannot create hive connection " + getHiveJdbcUrlWithDefaultDBName(), e);
}
@@ -630,14 +630,14 @@ public class HoodieHiveClient {
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSynced) {
if (!lastCommitTimeSynced.isPresent()) {
- LOG.info("Last commit time synced is not known, listing all partitions in " + syncConfig.basePath + ",FS :" + fs);
+ LOG.info("Last commit time synced is not known, listing all partitions in {}, FS :{}", syncConfig.basePath, fs);
try {
return FSUtils.getAllPartitionPaths(fs, syncConfig.basePath, syncConfig.assumeDatePartitioning);
} catch (IOException e) {
throw new HoodieIOException("Failed to list all partitions in " + syncConfig.basePath, e);
}
} else {
- LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then");
+ LOG.info("Last commit time synced is {}, Getting commits since then", lastCommitTimeSynced.get());
HoodieTimeline timelineToSync = activeTimeline.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE);
return timelineToSync.getInstants().map(s -> {
diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java b/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
index d945b58..77c04b5 100644
--- a/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
+++ b/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
@@ -31,8 +31,8 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.GroupType;
@@ -53,7 +53,7 @@ import java.util.stream.Collectors;
*/
public class SchemaUtil {
- private static final Logger LOG = LogManager.getLogger(SchemaUtil.class);
+ private static final Logger LOG = LoggerFactory.getLogger(SchemaUtil.class);
public static final String HIVE_ESCAPE_CHARACTER = "`";
/**
@@ -67,7 +67,7 @@ public class SchemaUtil {
} catch (IOException e) {
throw new HoodieHiveSyncException("Failed to convert parquet schema to hive schema", e);
}
- LOG.info("Getting schema difference for " + tableSchema + "\r\n\r\n" + newTableSchema);
+ LOG.info("Getting schema difference for tableSchema :{}, newTableSchema :{}", tableSchema, newTableSchema);
SchemaDifference.Builder schemaDiffBuilder = SchemaDifference.newBuilder(storageSchema, tableSchema);
Set<String> tableColumns = Sets.newHashSet();
@@ -85,7 +85,7 @@ public class SchemaUtil {
continue;
}
// We will log this and continue. Hive schema is a superset of all parquet schemas
- LOG.warn("Ignoring table column " + fieldName + " as its not present in the parquet schema");
+ LOG.warn("Ignoring table column {} as its not present in the parquet schema", fieldName);
continue;
}
tableColumnType = tableColumnType.replaceAll("\\s+", "");
@@ -112,7 +112,7 @@ public class SchemaUtil {
schemaDiffBuilder.addTableColumn(entry.getKey(), entry.getValue());
}
}
- LOG.info("Difference between schemas: " + schemaDiffBuilder.build().toString());
+ LOG.info("Difference between schemas: {}", schemaDiffBuilder.build().toString());
return schemaDiffBuilder.build();
}
diff --git a/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java b/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java
index d82c33b..5118d19 100644
--- a/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java
+++ b/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java
@@ -35,8 +35,8 @@ import org.apache.hadoop.hive.metastore.TUGIBasedProcessor;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.thrift.TUGIContainingTransport;
import org.apache.hive.service.server.HiveServer2;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
@@ -59,7 +59,7 @@ import java.util.concurrent.Executors;
public class HiveTestService {
- private static final Logger LOG = LogManager.getLogger(HiveTestService.class);
+ private static final Logger LOG = LoggerFactory.getLogger(HiveTestService.class);
private static final int CONNECTION_TIMEOUT = 30000;
@@ -95,7 +95,7 @@ public class HiveTestService {
String localHiveLocation = getHiveLocation(workDir);
if (clean) {
- LOG.info("Cleaning Hive cluster data at: " + localHiveLocation + " and starting fresh.");
+ LOG.info("Cleaning Hive cluster data at: {} and starting fresh.", localHiveLocation);
File file = new File(localHiveLocation);
FileIOUtils.deleteDirectory(file);
}
@@ -155,7 +155,7 @@ public class HiveTestService {
return true;
} catch (MetaException e) {
// ignore as this is expected
- LOG.info("server " + hostname + ":" + port + " not up " + e);
+ LOG.error("server {}:{} not up ", hostname, port, e);
}
if (System.currentTimeMillis() > start + timeout) {