You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by na...@apache.org on 2019/05/24 00:35:58 UTC

[incubator-hudi] branch master updated: Allow users to set hoodie configs figs for Compactor, Cleaner and HDFSParquetImporter utility scripts

This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fe526d  Allow users to set hoodie configs figs for Compactor, Cleaner and HDFSParquetImporter utility scripts
2fe526d is described below

commit 2fe526d5485963493e44d9c6cb2a80cb84244c1f
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Thu May 23 16:35:30 2019 -0700

    Allow users to set hoodie configs figs for Compactor, Cleaner and HDFSParquetImporter utility scripts
---
 .../uber/hoodie/utilities/HDFSParquetImporter.java    | 18 +++++++++++++++++-
 .../java/com/uber/hoodie/utilities/HoodieCleaner.java | 12 +++++-------
 .../com/uber/hoodie/utilities/HoodieCompactor.java    | 19 +++++++++++++++++--
 .../java/com/uber/hoodie/utilities/UtilHelpers.java   | 15 ++++++++++++++-
 4 files changed, 53 insertions(+), 11 deletions(-)

diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java
index 9e6f455..7cda533 100644
--- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java
+++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HDFSParquetImporter.java
@@ -30,10 +30,12 @@ import com.uber.hoodie.common.model.HoodieRecordPayload;
 import com.uber.hoodie.common.table.HoodieTableConfig;
 import com.uber.hoodie.common.table.HoodieTableMetaClient;
 import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.TypedProperties;
 import com.uber.hoodie.exception.HoodieIOException;
 import java.io.IOException;
 import java.io.Serializable;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
@@ -57,14 +59,22 @@ import scala.Tuple2;
  * Loads data from Parquet Sources
  */
 public class HDFSParquetImporter implements Serializable {
+  private static volatile Logger log = LogManager.getLogger(HDFSParquetImporter.class);
 
   public static final SimpleDateFormat PARTITION_FORMATTER = new SimpleDateFormat("yyyy/MM/dd");
   private static volatile Logger logger = LogManager.getLogger(HDFSParquetImporter.class);
   private final Config cfg;
   private transient FileSystem fs;
+  /**
+   * Bag of properties with source, hoodie client, key generator etc.
+   */
+  private TypedProperties props;
 
   public HDFSParquetImporter(Config cfg) throws IOException {
     this.cfg = cfg;
+    this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) :
+        UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
+    log.info("Creating Cleaner with configs : " + props.toString());
   }
 
   public static void main(String[] args) throws Exception {
@@ -116,7 +126,7 @@ public class HDFSParquetImporter implements Serializable {
           .initializePathAsHoodieDataset(jsc.hadoopConfiguration(), cfg.targetPath, properties);
 
       HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.targetPath, schemaStr,
-          cfg.parallelism, Optional.empty());
+          cfg.parallelism, Optional.empty(), props);
 
       JavaRDD<HoodieRecord<HoodieRecordPayload>> hoodieRecords = buildHoodieRecordsForImport(jsc, schemaStr);
       // Get instant time.
@@ -247,6 +257,12 @@ public class HDFSParquetImporter implements Serializable {
     public String sparkMemory = null;
     @Parameter(names = {"--retry", "-rt"}, description = "number of retries", required = false)
     public int retry = 0;
+    @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+        + "hoodie client for importing")
+    public String propsFilePath = null;
+    @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+        + "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter")
+    public List<String> configs = new ArrayList<>();
     @Parameter(names = {"--help", "-h"}, help = true)
     public Boolean help = false;
   }
diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCleaner.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCleaner.java
index 7ebca04..3c25b6b 100644
--- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCleaner.java
+++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCleaner.java
@@ -22,7 +22,6 @@ import com.uber.hoodie.HoodieWriteClient;
 import com.uber.hoodie.common.util.FSUtils;
 import com.uber.hoodie.common.util.TypedProperties;
 import com.uber.hoodie.config.HoodieWriteConfig;
-import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -35,7 +34,7 @@ import org.apache.spark.api.java.JavaSparkContext;
 
 public class HoodieCleaner {
 
-  private static volatile Logger log = LogManager.getLogger(HoodieDeltaStreamer.class);
+  private static volatile Logger log = LogManager.getLogger(HoodieCleaner.class);
 
   /**
    * Config for Cleaner
@@ -55,14 +54,14 @@ public class HoodieCleaner {
   /**
    * Bag of properties with source, hoodie client, key generator etc.
    */
-  TypedProperties props;
+  private TypedProperties props;
 
   public HoodieCleaner(Config cfg, JavaSparkContext jssc) throws IOException {
     this.cfg = cfg;
     this.jssc = jssc;
     this.fs = FSUtils.getFs(cfg.basePath, jssc.hadoopConfiguration());
-
-    this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
+    this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) :
+        UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
     log.info("Creating Cleaner with configs : " + props.toString());
   }
 
@@ -86,8 +85,7 @@ public class HoodieCleaner {
 
     @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
         + "hoodie client for cleaning")
-    public String propsFilePath =
-        "file://" + System.getProperty("user.dir") + "/src/test/resources/delta-streamer-config/dfs-source.properties";
+    public String propsFilePath = null;
 
     @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
         + "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter")
diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactor.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactor.java
index 8aa5967..f7e8de4 100644
--- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactor.java
+++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/HoodieCompactor.java
@@ -23,9 +23,13 @@ import com.beust.jcommander.Parameter;
 import com.uber.hoodie.HoodieWriteClient;
 import com.uber.hoodie.WriteStatus;
 import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.TypedProperties;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
@@ -37,9 +41,12 @@ public class HoodieCompactor {
   private static volatile Logger logger = LogManager.getLogger(HoodieCompactor.class);
   private final Config cfg;
   private transient FileSystem fs;
+  private TypedProperties props;
 
   public HoodieCompactor(Config cfg) {
     this.cfg = cfg;
+    this.props = cfg.propsFilePath == null ? UtilHelpers.buildProperties(cfg.configs) :
+        UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
   }
 
   public static class Config implements Serializable {
@@ -70,6 +77,14 @@ public class HoodieCompactor {
     public String strategyClassName = null;
     @Parameter(names = {"--help", "-h"}, help = true)
     public Boolean help = false;
+
+    @Parameter(names = {"--props"}, description = "path to properties file on localfs or dfs, with configurations for "
+        + "hoodie client for compacting")
+    public String propsFilePath = null;
+
+    @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file "
+        + "(using the CLI parameter \"--propsFilePath\") can also be passed command line using this parameter")
+    public List<String> configs = new ArrayList<>();
   }
 
   public static void main(String[] args) throws Exception {
@@ -108,7 +123,7 @@ public class HoodieCompactor {
     //Get schema.
     String schemaStr = UtilHelpers.parseSchema(fs, cfg.schemaFile);
     HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, schemaStr, cfg.parallelism,
-        Optional.empty());
+        Optional.empty(), props);
     JavaRDD<WriteStatus> writeResponse = client.compact(cfg.compactionInstantTime);
     return UtilHelpers.handleErrors(jsc, cfg.compactionInstantTime, writeResponse);
   }
@@ -116,7 +131,7 @@ public class HoodieCompactor {
   private int doSchedule(JavaSparkContext jsc) throws Exception {
     //Get schema.
     HoodieWriteClient client = UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism,
-        Optional.of(cfg.strategyClassName));
+        Optional.of(cfg.strategyClassName), props);
     client.scheduleCompactionAtInstant(cfg.compactionInstantTime, Optional.empty());
     return 0;
   }
diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java
index b24e3a2..0449c40 100644
--- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java
+++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java
@@ -18,6 +18,7 @@
 
 package com.uber.hoodie.utilities;
 
+import com.google.common.base.Preconditions;
 import com.uber.hoodie.HoodieWriteClient;
 import com.uber.hoodie.WriteStatus;
 import com.uber.hoodie.common.util.DFSPropertiesConfiguration;
@@ -100,6 +101,16 @@ public class UtilHelpers {
     }
   }
 
+  public static TypedProperties buildProperties(List<String> props) {
+    TypedProperties properties = new TypedProperties();
+    props.stream().forEach(x -> {
+      String[] kv = x.split("=");
+      Preconditions.checkArgument(kv.length == 2);
+      properties.setProperty(kv[0], kv[1]);
+    });
+    return properties;
+  }
+
   /**
    * Parse Schema from file
    *
@@ -163,7 +174,8 @@ public class UtilHelpers {
    * @param parallelism Parallelism
    */
   public static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, String basePath,
-      String schemaStr, int parallelism, Optional<String> compactionStrategyClass) throws Exception {
+      String schemaStr, int parallelism, Optional<String> compactionStrategyClass, TypedProperties properties)
+      throws Exception {
     HoodieCompactionConfig compactionConfig =
         compactionStrategyClass.map(strategy -> HoodieCompactionConfig.newBuilder().withInlineCompaction(false)
             .withCompactionStrategy(ReflectionUtils.loadClass(strategy))
@@ -173,6 +185,7 @@ public class UtilHelpers {
         .combineInput(true, true)
         .withCompactionConfig(compactionConfig)
         .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
+        .withProps(properties)
         .build();
     return new HoodieWriteClient(jsc, config);
   }