You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by le...@apache.org on 2020/01/10 01:38:41 UTC

[incubator-hudi] branch redo-log updated: [HUDI-459] Redo hudi-hive log statements using SLF4J (#1203)

This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch redo-log
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/redo-log by this push:
     new ac105e6  [HUDI-459] Redo hudi-hive log statements using SLF4J (#1203)
ac105e6 is described below

commit ac105e6d9b6dcd75f8145af6ce03600af40180e0
Author: lamber-ken <la...@163.com>
AuthorDate: Fri Jan 10 09:38:34 2020 +0800

    [HUDI-459] Redo hudi-hive log statements using SLF4J (#1203)
---
 hudi-hive/pom.xml                                  |  5 +++
 .../java/org/apache/hudi/hive/HiveSyncTool.java    | 30 +++++++--------
 .../org/apache/hudi/hive/HoodieHiveClient.java     | 44 +++++++++++-----------
 .../java/org/apache/hudi/hive/util/SchemaUtil.java | 12 +++---
 .../org/apache/hudi/hive/util/HiveTestService.java | 10 ++---
 5 files changed, 53 insertions(+), 48 deletions(-)

diff --git a/hudi-hive/pom.xml b/hudi-hive/pom.xml
index c552b70..1ab2533 100644
--- a/hudi-hive/pom.xml
+++ b/hudi-hive/pom.xml
@@ -49,6 +49,11 @@
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+      <version>${slf4j.version}</version>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.parquet</groupId>
diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java b/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
index 6bcb697..4029096 100644
--- a/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
+++ b/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncTool.java
@@ -34,8 +34,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.parquet.schema.MessageType;
 
 import java.util.List;
@@ -52,7 +52,7 @@ import java.util.stream.Collectors;
 @SuppressWarnings("WeakerAccess")
 public class HiveSyncTool {
 
-  private static final Logger LOG = LogManager.getLogger(HiveSyncTool.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HiveSyncTool.class);
   private final HoodieHiveClient hoodieHiveClient;
   public static final String SUFFIX_REALTIME_TABLE = "_rt";
   private final HiveSyncConfig cfg;
@@ -79,7 +79,7 @@ public class HiveSyncTool {
           cfg.tableName = originalTableName;
           break;
         default:
-          LOG.error("Unknown table type " + hoodieHiveClient.getTableType());
+          LOG.error("Unknown table type {}", hoodieHiveClient.getTableType());
           throw new InvalidDatasetException(hoodieHiveClient.getBasePath());
       }
     } catch (RuntimeException re) {
@@ -90,8 +90,8 @@ public class HiveSyncTool {
   }
 
   private void syncHoodieTable(boolean isRealTime) throws ClassNotFoundException {
-    LOG.info("Trying to sync hoodie table " + cfg.tableName + " with base path " + hoodieHiveClient.getBasePath()
-        + " of type " + hoodieHiveClient.getTableType());
+    LOG.info("Trying to sync hoodie table {} with base path {} of type {}",
+        cfg.tableName, hoodieHiveClient.getBasePath(), hoodieHiveClient.getTableType());
 
     // Check if the necessary table exists
     boolean tableExists = hoodieHiveClient.doesTableExist();
@@ -100,20 +100,20 @@ public class HiveSyncTool {
     // Sync schema if needed
     syncSchema(tableExists, isRealTime, schema);
 
-    LOG.info("Schema sync complete. Syncing partitions for " + cfg.tableName);
+    LOG.info("Schema sync complete. Syncing partitions for {}", cfg.tableName);
     // Get the last time we successfully synced partitions
     Option<String> lastCommitTimeSynced = Option.empty();
     if (tableExists) {
       lastCommitTimeSynced = hoodieHiveClient.getLastCommitTimeSynced();
     }
-    LOG.info("Last commit time synced was found to be " + lastCommitTimeSynced.orElse("null"));
+    LOG.info("Last commit time synced was found to be {}", lastCommitTimeSynced.orElse("null"));
     List<String> writtenPartitionsSince = hoodieHiveClient.getPartitionsWrittenToSince(lastCommitTimeSynced);
-    LOG.info("Storage partitions scan complete. Found " + writtenPartitionsSince.size());
+    LOG.info("Storage partitions scan complete. Found {}", writtenPartitionsSince.size());
     // Sync the partitions if needed
     syncPartitions(writtenPartitionsSince);
 
     hoodieHiveClient.updateLastCommitTimeSynced();
-    LOG.info("Sync complete for " + cfg.tableName);
+    LOG.info("Sync complete for {}", cfg.tableName);
   }
 
   /**
@@ -126,7 +126,7 @@ public class HiveSyncTool {
   private void syncSchema(boolean tableExists, boolean isRealTime, MessageType schema) throws ClassNotFoundException {
     // Check and sync schema
     if (!tableExists) {
-      LOG.info("Table " + cfg.tableName + " is not found. Creating it");
+      LOG.info("Table {} is not found. Creating it", cfg.tableName);
       if (!isRealTime) {
         // TODO - RO Table for MOR only after major compaction (UnboundedCompaction is default
         // for now)
@@ -150,10 +150,10 @@ public class HiveSyncTool {
       Map<String, String> tableSchema = hoodieHiveClient.getTableSchema();
       SchemaDifference schemaDiff = SchemaUtil.getSchemaDifference(schema, tableSchema, cfg.partitionFields);
       if (!schemaDiff.isEmpty()) {
-        LOG.info("Schema difference found for " + cfg.tableName);
+        LOG.info("Schema difference found for {}", cfg.tableName);
         hoodieHiveClient.updateTableDefinition(schema);
       } else {
-        LOG.info("No Schema difference for " + cfg.tableName);
+        LOG.info("No Schema difference for {}", cfg.tableName);
       }
     }
   }
@@ -168,10 +168,10 @@ public class HiveSyncTool {
       List<PartitionEvent> partitionEvents =
           hoodieHiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
       List<String> newPartitions = filterPartitions(partitionEvents, PartitionEventType.ADD);
-      LOG.info("New Partitions " + newPartitions);
+      LOG.info("New Partitions {}", newPartitions);
       hoodieHiveClient.addPartitionsToTable(newPartitions);
       List<String> updatePartitions = filterPartitions(partitionEvents, PartitionEventType.UPDATE);
-      LOG.info("Changed Partitions " + updatePartitions);
+      LOG.info("Changed Partitions {}", updatePartitions);
       hoodieHiveClient.updatePartitionsToTable(updatePartitions);
     } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed to sync partitions for table " + cfg.tableName, e);
diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index d176500..820e59b 100644
--- a/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -48,8 +48,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.jdbc.HiveDriver;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
@@ -87,7 +87,7 @@ public class HoodieHiveClient {
     }
   }
 
-  private static final Logger LOG = LogManager.getLogger(HoodieHiveClient.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HoodieHiveClient.class);
   private final HoodieTableMetaClient metaClient;
   private final HoodieTableType tableType;
   private final PartitionValueExtractor partitionValueExtractor;
@@ -108,7 +108,7 @@ public class HoodieHiveClient {
     // Support both JDBC and metastore based implementations for backwards compatiblity. Future users should
     // disable jdbc and depend on metastore client for all hive registrations
     if (cfg.useJdbc) {
-      LOG.info("Creating hive connection " + cfg.jdbcUrl);
+      LOG.info("Creating hive connection {}", cfg.jdbcUrl);
       createHiveConnection();
     }
     try {
@@ -137,10 +137,10 @@ public class HoodieHiveClient {
    */
   void addPartitionsToTable(List<String> partitionsToAdd) {
     if (partitionsToAdd.isEmpty()) {
-      LOG.info("No partitions to add for " + syncConfig.tableName);
+      LOG.info("No partitions to add for {}", syncConfig.tableName);
       return;
     }
-    LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + syncConfig.tableName);
+    LOG.info("Adding partitions {} to table {}", partitionsToAdd.size(), syncConfig.tableName);
     String sql = constructAddPartitions(partitionsToAdd);
     updateHiveSQL(sql);
   }
@@ -150,10 +150,10 @@ public class HoodieHiveClient {
    */
   void updatePartitionsToTable(List<String> changedPartitions) {
     if (changedPartitions.isEmpty()) {
-      LOG.info("No partitions to change for " + syncConfig.tableName);
+      LOG.info("No partitions to change for {}", syncConfig.tableName);
       return;
     }
-    LOG.info("Changing partitions " + changedPartitions.size() + " on " + syncConfig.tableName);
+    LOG.info("Changing partitions {} on {}", changedPartitions.size(), syncConfig.tableName);
     List<String> sqls = constructChangePartitions(changedPartitions);
     for (String sql : sqls) {
       updateHiveSQL(sql);
@@ -260,7 +260,7 @@ public class HoodieHiveClient {
               .append(HIVE_ESCAPE_CHARACTER).append(syncConfig.tableName)
               .append(HIVE_ESCAPE_CHARACTER).append(" REPLACE COLUMNS(")
               .append(newSchemaStr).append(" )").append(cascadeClause);
-      LOG.info("Updating table definition with " + sqlBuilder);
+      LOG.info("Updating table definition with {}", sqlBuilder);
       updateHiveSQL(sqlBuilder.toString());
     } catch (IOException e) {
       throw new HoodieHiveSyncException("Failed to update table for " + syncConfig.tableName, e);
@@ -271,7 +271,7 @@ public class HoodieHiveClient {
     try {
       String createSQLQuery =
           SchemaUtil.generateCreateDDL(storageSchema, syncConfig, inputFormatClass, outputFormatClass, serdeClass);
-      LOG.info("Creating table with " + createSQLQuery);
+      LOG.info("Creating table with {}", createSQLQuery);
       updateHiveSQL(createSQLQuery);
     } catch (IOException e) {
       throw new HoodieHiveSyncException("Failed to create table " + syncConfig.tableName, e);
@@ -329,7 +329,7 @@ public class HoodieHiveClient {
       schema.putAll(columnsMap);
       schema.putAll(partitionKeysMap);
       final long end = System.currentTimeMillis();
-      LOG.info(String.format("Time taken to getTableSchema: %s ms", (end - start)));
+      LOG.info("Time taken to getTableSchema: {} ms", (end - start));
       return schema;
     } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed to get table schema for : " + syncConfig.tableName, e);
@@ -364,7 +364,7 @@ public class HoodieHiveClient {
           // Get a datafile written and get the schema from that file
           Option<HoodieInstant> lastCompactionCommit =
               metaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
-          LOG.info("Found the last compaction commit as " + lastCompactionCommit);
+          LOG.info("Found the last compaction commit as {}", lastCompactionCommit);
 
           Option<HoodieInstant> lastDeltaCommit;
           if (lastCompactionCommit.isPresent()) {
@@ -374,7 +374,7 @@ public class HoodieHiveClient {
             lastDeltaCommit =
                 metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
           }
-          LOG.info("Found the last delta commit " + lastDeltaCommit);
+          LOG.info("Found the last delta commit {}", lastDeltaCommit);
 
           if (lastDeltaCommit.isPresent()) {
             HoodieInstant lastDeltaInstant = lastDeltaCommit.get();
@@ -407,7 +407,7 @@ public class HoodieHiveClient {
             return readSchemaFromLastCompaction(lastCompactionCommit);
           }
         default:
-          LOG.error("Unknown table type " + tableType);
+          LOG.error("Unknown table type {}", tableType);
           throw new InvalidDatasetException(syncConfig.basePath);
       }
     } catch (IOException e) {
@@ -441,7 +441,7 @@ public class HoodieHiveClient {
     MessageType messageType = SchemaUtil.readSchemaFromLogFile(fs, path);
     // Fall back to read the schema from last compaction
     if (messageType == null) {
-      LOG.info("Falling back to read the schema from last compaction " + lastCompactionCommitOpt);
+      LOG.info("Falling back to read the schema from last compaction {}", lastCompactionCommitOpt);
       return readSchemaFromLastCompaction(lastCompactionCommitOpt);
     }
     return messageType;
@@ -451,7 +451,7 @@ public class HoodieHiveClient {
    * Read the parquet schema from a parquet File.
    */
   private MessageType readSchemaFromDataFile(Path parquetFilePath) throws IOException {
-    LOG.info("Reading schema from " + parquetFilePath);
+    LOG.info("Reading schema from {}", parquetFilePath);
     if (!fs.exists(parquetFilePath)) {
       throw new IllegalArgumentException(
           "Failed to read schema from data file " + parquetFilePath + ". File does not exist.");
@@ -482,7 +482,7 @@ public class HoodieHiveClient {
       Statement stmt = null;
       try {
         stmt = connection.createStatement();
-        LOG.info("Executing SQL " + s);
+        LOG.info("Executing SQL {}", s);
         stmt.execute(s);
       } catch (SQLException e) {
         throw new HoodieHiveSyncException("Failed in executing SQL " + s, e);
@@ -513,12 +513,12 @@ public class HoodieHiveClient {
       ss = SessionState.start(configuration);
       hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration);
       final long endTime = System.currentTimeMillis();
-      LOG.info(String.format("Time taken to start SessionState and create Driver: %s ms", (endTime - startTime)));
+      LOG.info("Time taken to start SessionState and create Driver: {} ms", (endTime - startTime));
       for (String sql : sqls) {
         final long start = System.currentTimeMillis();
         responses.add(hiveDriver.run(sql));
         final long end = System.currentTimeMillis();
-        LOG.info(String.format("Time taken to execute [%s]: %s ms", sql, (end - start)));
+        LOG.info("Time taken to execute [{}]: {} ms", sql, (end - start));
       }
     } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed in executing SQL", e);
@@ -552,7 +552,7 @@ public class HoodieHiveClient {
 
       try {
         this.connection = DriverManager.getConnection(syncConfig.jdbcUrl, syncConfig.hiveUser, syncConfig.hivePass);
-        LOG.info("Successfully established Hive connection to  " + syncConfig.jdbcUrl);
+        LOG.info("Successfully established Hive connection to {}", syncConfig.jdbcUrl);
       } catch (SQLException e) {
         throw new HoodieHiveSyncException("Cannot create hive connection " + getHiveJdbcUrlWithDefaultDBName(), e);
       }
@@ -630,14 +630,14 @@ public class HoodieHiveClient {
   @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
   List<String> getPartitionsWrittenToSince(Option<String> lastCommitTimeSynced) {
     if (!lastCommitTimeSynced.isPresent()) {
-      LOG.info("Last commit time synced is not known, listing all partitions in " + syncConfig.basePath + ",FS :" + fs);
+      LOG.info("Last commit time synced is not known, listing all partitions in {}, FS :{}", syncConfig.basePath, fs);
       try {
         return FSUtils.getAllPartitionPaths(fs, syncConfig.basePath, syncConfig.assumeDatePartitioning);
       } catch (IOException e) {
         throw new HoodieIOException("Failed to list all partitions in " + syncConfig.basePath, e);
       }
     } else {
-      LOG.info("Last commit time synced is " + lastCommitTimeSynced.get() + ", Getting commits since then");
+      LOG.info("Last commit time synced is {}, Getting commits since then", lastCommitTimeSynced.get());
 
       HoodieTimeline timelineToSync = activeTimeline.findInstantsAfter(lastCommitTimeSynced.get(), Integer.MAX_VALUE);
       return timelineToSync.getInstants().map(s -> {
diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java b/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
index d945b58..77c04b5 100644
--- a/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
+++ b/hudi-hive/src/main/java/org/apache/hudi/hive/util/SchemaUtil.java
@@ -31,8 +31,8 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.schema.DecimalMetadata;
 import org.apache.parquet.schema.GroupType;
@@ -53,7 +53,7 @@ import java.util.stream.Collectors;
  */
 public class SchemaUtil {
 
-  private static final Logger LOG = LogManager.getLogger(SchemaUtil.class);
+  private static final Logger LOG = LoggerFactory.getLogger(SchemaUtil.class);
   public static final String HIVE_ESCAPE_CHARACTER = "`";
 
   /**
@@ -67,7 +67,7 @@ public class SchemaUtil {
     } catch (IOException e) {
       throw new HoodieHiveSyncException("Failed to convert parquet schema to hive schema", e);
     }
-    LOG.info("Getting schema difference for " + tableSchema + "\r\n\r\n" + newTableSchema);
+    LOG.info("Getting schema difference for tableSchema :{}, newTableSchema :{}", tableSchema, newTableSchema);
     SchemaDifference.Builder schemaDiffBuilder = SchemaDifference.newBuilder(storageSchema, tableSchema);
     Set<String> tableColumns = Sets.newHashSet();
 
@@ -85,7 +85,7 @@ public class SchemaUtil {
             continue;
           }
           // We will log this and continue. Hive schema is a superset of all parquet schemas
-          LOG.warn("Ignoring table column " + fieldName + " as its not present in the parquet schema");
+          LOG.warn("Ignoring table column {} as its not present in the parquet schema", fieldName);
           continue;
         }
         tableColumnType = tableColumnType.replaceAll("\\s+", "");
@@ -112,7 +112,7 @@ public class SchemaUtil {
         schemaDiffBuilder.addTableColumn(entry.getKey(), entry.getValue());
       }
     }
-    LOG.info("Difference between schemas: " + schemaDiffBuilder.build().toString());
+    LOG.info("Difference between schemas: {}", schemaDiffBuilder.build().toString());
 
     return schemaDiffBuilder.build();
   }
diff --git a/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java b/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java
index d82c33b..5118d19 100644
--- a/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java
+++ b/hudi-hive/src/test/java/org/apache/hudi/hive/util/HiveTestService.java
@@ -35,8 +35,8 @@ import org.apache.hadoop.hive.metastore.TUGIBasedProcessor;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.thrift.TUGIContainingTransport;
 import org.apache.hive.service.server.HiveServer2;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.server.TServer;
@@ -59,7 +59,7 @@ import java.util.concurrent.Executors;
 
 public class HiveTestService {
 
-  private static final Logger LOG = LogManager.getLogger(HiveTestService.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HiveTestService.class);
 
   private static final int CONNECTION_TIMEOUT = 30000;
 
@@ -95,7 +95,7 @@ public class HiveTestService {
 
     String localHiveLocation = getHiveLocation(workDir);
     if (clean) {
-      LOG.info("Cleaning Hive cluster data at: " + localHiveLocation + " and starting fresh.");
+      LOG.info("Cleaning Hive cluster data at: {} and starting fresh.", localHiveLocation);
       File file = new File(localHiveLocation);
       FileIOUtils.deleteDirectory(file);
     }
@@ -155,7 +155,7 @@ public class HiveTestService {
         return true;
       } catch (MetaException e) {
         // ignore as this is expected
-        LOG.info("server " + hostname + ":" + port + " not up " + e);
+        LOG.error("server {}:{} not up ", hostname, port, e);
       }
 
       if (System.currentTimeMillis() > start + timeout) {