You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/07/25 00:59:27 UTC

[GitHub] [hudi] vinothchandar opened a new pull request #1876: [HUDI-242] Support for RFC-12/Bootstrapping of external datasets

vinothchandar opened a new pull request #1876:
URL: https://github.com/apache/hudi/pull/1876


    - [HUDI-418] Bootstrap Index Implementation using HFile with unit-test
    - [HUDI-421] FileSystem View Changes to support Bootstrap with unit-tests
    - [HUDI-424] Implement Query Side Integration for querying tables containing bootstrap file slices
    - [HUDI-423] Implement upsert functionality for handling updates to these bootstrap file slices
    - [HUDI-421] Bootstrap Write Client with tests
    - [HUDI-425] Added HoodieDeltaStreamer support
    - [HUDI-899] Add a knob to change partition-path style while performing metadata bootstrap
    - [HUDI-900] Metadata Bootstrap Key Generator needs to handle complex keys correctly
    - [HUDI-424] Simplify Record reader implementation
    - [HUDI-423] Implement upsert functionality for handling updates to these bootstrap file slices
    - [HUDI-420] Hoodie Demo working with hive and sparkSQL. Also, Hoodie CLI working with bootstrap tables
   
   Co-authored-by: Mehrotra <ud...@amazon.com>
   
   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contributing.html before opening a pull request.*
   
   ## What is the purpose of the pull request
   
   *(For example: This pull request adds quick-start document.)*
   
   ## Brief change log
   
   *(for example:)*
     - *Modify AnnotationLocation checkstyle rule in checkstyle.xml*
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This pull request is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
     - *Added integration tests for end-to-end.*
     - *Added HoodieClientWriteTest to verify the change.*
     - *Manually verified the change by running a job locally.*
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] umehrot2 commented on a change in pull request #1876: [HUDI-242] Support for RFC-12/Bootstrapping of external datasets

Posted by GitBox <gi...@apache.org>.
umehrot2 commented on a change in pull request #1876:
URL: https://github.com/apache/hudi/pull/1876#discussion_r463889895



##########
File path: hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
##########
@@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.client.bootstrap.BootstrapMode;
+import org.apache.hudi.client.bootstrap.RecordDataBootstrapInputProvider;
+import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
+import org.apache.hudi.client.bootstrap.selector.RecordDataBootstrapModeSelector;
+import org.apache.hudi.client.bootstrap.selector.RecordMetadataOnlyBootstrapModeSelector;
+import org.apache.hudi.common.bootstrap.FileStatusUtils;
+import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.testutils.RawTripTestPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieBootstrapConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hudi.index.HoodieIndex.IndexType;
+import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
+import org.apache.hudi.keygen.SimpleKeyGenerator;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.schema.MessageType;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.api.java.UDF1;
+import org.apache.spark.sql.types.DataTypes;
+
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.Random;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.generateGenericRecord;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.apache.spark.sql.functions.callUDF;
+
+/**
+ * Tests Bootstrap Client functionality.
+ */
+public class TestBootstrap extends HoodieClientTestBase {
+
+  //FIXME(bootstrap): why is this test so darn slow?
+
+  public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,"
+      + "struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";
+
+  @TempDir
+  public java.nio.file.Path tmpFolder;
+
+  protected String srcPath = null;
+
+  private HoodieParquetInputFormat roInputFormat;
+  private JobConf roJobConf;
+
+  private HoodieParquetRealtimeInputFormat rtInputFormat;
+  private JobConf rtJobConf;
+  private SparkSession spark;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    srcPath = tmpFolder.toAbsolutePath().toString() + "/data";
+    initPath();
+    spark = SparkSession.builder()
+        .appName("Bootstrap test")
+        .master("local[2]")
+        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+        .getOrCreate();
+    jsc = new JavaSparkContext(spark.sparkContext());
+    sqlContext = spark.sqlContext();
+    hadoopConf = spark.sparkContext().hadoopConfiguration();
+    initTestDataGenerator();
+    initMetaClient();
+    // initialize parquet input format
+    reloadInputFormats();
+  }
+
+  @AfterEach
+  public void tearDown() throws IOException {
+    cleanupClients();
+    cleanupTestDataGenerator();
+  }
+
+  private void reloadInputFormats() {
+    // initialize parquet input format
+    roInputFormat = new HoodieParquetInputFormat();
+    roJobConf = new JobConf(jsc.hadoopConfiguration());
+    roInputFormat.setConf(roJobConf);
+
+    rtInputFormat = new HoodieParquetRealtimeInputFormat();
+    rtJobConf = new JobConf(jsc.hadoopConfiguration());
+    rtInputFormat.setConf(rtJobConf);
+  }
+
+  public Schema generateNewDataSetAndReturnSchema(double timestamp, int numRecords, List<String> partitionPaths,
+      String srcPath) throws Exception {
+    boolean isPartitioned = partitionPaths != null && !partitionPaths.isEmpty();
+    Dataset<Row> df = generateTestRawTripDataset(timestamp, numRecords, partitionPaths, jsc, sqlContext);
+    df.printSchema();
+    if (isPartitioned) {
+      df.write().partitionBy("datestr").format("parquet").mode(SaveMode.Overwrite).save(srcPath);
+    } else {
+      df.write().format("parquet").mode(SaveMode.Overwrite).save(srcPath);
+    }
+    String filePath = FileStatusUtils.toPath(FSUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), srcPath,
+        (status) -> status.getName().endsWith(".parquet")).stream().findAny().map(p -> p.getValue().stream().findAny())
+        .orElse(null).get().getPath()).toString();
+    ParquetFileReader reader = ParquetFileReader.open(metaClient.getHadoopConf(), new Path(filePath));
+    MessageType schema = reader.getFooter().getFileMetaData().getSchema();
+    return new AvroSchemaConverter().convert(schema);
+  }
+
+  @Test
+  public void testMetadataBootstrapUnpartitionedCOW() throws Exception {
+    testBootstrapCommon(false, false, EffectiveMode.METADATA_BOOTSTRAP_MODE);
+  }
+
+  @Test
+  public void testMetadataBootstrapWithUpdatesCOW() throws Exception {
+    testBootstrapCommon(true, false, EffectiveMode.METADATA_BOOTSTRAP_MODE);
+  }
+
+  private enum EffectiveMode {
+    FULL_BOOTSTRAP_MODE,
+    METADATA_BOOTSTRAP_MODE,
+    MIXED_BOOTSTRAP_MODE
+  }
+
+  private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, EffectiveMode mode) throws Exception {
+    if (deltaCommit) {
+      metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ);
+    }
+    int totalRecords = 100;
+    String keyGeneratorClass = partitioned ? SimpleKeyGenerator.class.getCanonicalName()
+        : NonpartitionedKeyGenerator.class.getCanonicalName();
+    final String bootstrapModeSelectorClass;
+    final String bootstrapCommitInstantTs;
+    final boolean checkNumRawFiles;
+    final boolean isBootstrapIndexCreated;
+    final int numInstantsAfterBootstrap;
+    final List<String> bootstrapInstants;
+    switch (mode) {
+      case FULL_BOOTSTRAP_MODE:
+        bootstrapModeSelectorClass = RecordDataBootstrapModeSelector.class.getCanonicalName();
+        bootstrapCommitInstantTs = HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS;
+        checkNumRawFiles = false;
+        isBootstrapIndexCreated = false;
+        numInstantsAfterBootstrap = 1;
+        bootstrapInstants = Arrays.asList(bootstrapCommitInstantTs);
+        break;
+      case METADATA_BOOTSTRAP_MODE:
+        bootstrapModeSelectorClass = RecordMetadataOnlyBootstrapModeSelector.class.getCanonicalName();
+        bootstrapCommitInstantTs = HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS;
+        checkNumRawFiles = true;
+        isBootstrapIndexCreated = true;
+        numInstantsAfterBootstrap = 1;
+        bootstrapInstants = Arrays.asList(bootstrapCommitInstantTs);
+        break;
+      default:
+        bootstrapModeSelectorClass = TestRandomBootstapModeSelector.class.getName();
+        bootstrapCommitInstantTs = HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS;
+        checkNumRawFiles = false;
+        isBootstrapIndexCreated = true;
+        numInstantsAfterBootstrap = 2;
+        bootstrapInstants = Arrays.asList(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
+            HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS);
+        break;
+    }
+    List<String> partitions = Arrays.asList("2020/04/01", "2020/04/02", "2020/04/03");
+    double timestamp = new Double(Instant.now().toEpochMilli()).longValue();
+    Schema schema = generateNewDataSetAndReturnSchema(timestamp, totalRecords, partitions, srcPath);
+    HoodieWriteConfig config = getConfigBuilder(schema.toString())
+        .withAutoCommit(true)
+        .withSchema(schema.toString())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(1)
+            .build())
+        .withBootstrapConfig(HoodieBootstrapConfig.newBuilder()
+            .withBootstrapSourceBasePath(srcPath)
+            .withBootstrapKeyGenClass(keyGeneratorClass)
+            .withFullBootstrapInputProvider(FullTestBootstrapInputProvider.class.getName())
+            .withBootstrapParallelism(3)
+            .withBootstrapModeSelector(bootstrapModeSelectorClass).build())
+        .build();
+    HoodieWriteClient client = new HoodieWriteClient(jsc, config);
+    client.bootstrap(Option.empty());
+    checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap,
+        numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants);
+
+    // Rollback Bootstrap
+    FSUtils.deleteInstantFile(metaClient.getFs(), metaClient.getMetaPath(), new HoodieInstant(State.COMPLETED,
+        deltaCommit ? HoodieTimeline.DELTA_COMMIT_ACTION : HoodieTimeline.COMMIT_ACTION, bootstrapCommitInstantTs));
+    client.rollBackInflightBootstrap();
+    metaClient.reloadActiveTimeline();
+    assertEquals(0, metaClient.getCommitsTimeline().countInstants());
+    assertEquals(0L, FSUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), basePath,
+        (status) -> status.getName().endsWith(".parquet")).stream().flatMap(f -> f.getValue().stream()).count());
+
+    BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
+    assertFalse(index.isIndexAvailable());
+
+    // Run bootstrap again
+    client = new HoodieWriteClient(jsc, config);
+    client.bootstrap(Option.empty());
+
+    metaClient.reloadActiveTimeline();
+    index = BootstrapIndex.getBootstrapIndex(metaClient);
+    if (isBootstrapIndexCreated) {
+      assertTrue(index.isIndexAvailable());
+    } else {
+      assertFalse(index.isIndexAvailable());
+    }
+
+    checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap,
+        numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants);
+
+    // Upsert case
+    double updateTimestamp = new Double(Instant.now().toEpochMilli()).longValue();
+    String updateSPath = tmpFolder.toAbsolutePath().toString() + "/data2";
+    generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords, partitions, updateSPath);
+    JavaRDD<HoodieRecord> updateBatch =
+        generateInputBatch(jsc, FSUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), updateSPath,
+            (status) -> status.getName().endsWith("parquet")), schema);
+    String newInstantTs = client.startCommit();
+    client.upsert(updateBatch, newInstantTs);
+    checkBootstrapResults(totalRecords, schema, newInstantTs, false, numInstantsAfterBootstrap + 1,
+        updateTimestamp, deltaCommit ? timestamp : updateTimestamp, deltaCommit);
+
+    if (deltaCommit) {
+      Option<String> compactionInstant = client.scheduleCompaction(Option.empty());
+      assertTrue(compactionInstant.isPresent());
+      client.compact(compactionInstant.get());
+      checkBootstrapResults(totalRecords, schema, compactionInstant.get(), checkNumRawFiles,
+          numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp, !deltaCommit,
+          Arrays.asList(compactionInstant.get()));
+    }
+  }
+
+  @Test
+  public void testMetadataBootstrapWithUpdatesMOR() throws Exception {
+    testBootstrapCommon(true, true, EffectiveMode.METADATA_BOOTSTRAP_MODE);
+  }
+
+  @Test
+  public void testFullBoostrapOnlyCOW() throws Exception {
+    testBootstrapCommon(true, false, EffectiveMode.FULL_BOOTSTRAP_MODE);
+  }
+
+  @Test
+  public void testFullBootstrapWithUpdatesMOR() throws Exception {
+    testBootstrapCommon(true, true, EffectiveMode.FULL_BOOTSTRAP_MODE);
+  }
+
+  @Test
+  public void testMetaAndFullBoostrapCOW() throws Exception {
+    testBootstrapCommon(true, false, EffectiveMode.MIXED_BOOTSTRAP_MODE);
+  }
+
+  @Test
+  public void testMetadataAndFullBootstrapWithUpdatesMOR() throws Exception {
+    testBootstrapCommon(true, true, EffectiveMode.MIXED_BOOTSTRAP_MODE);
+  }
+
+  private void checkBootstrapResults(int totalRecords, Schema schema, String maxInstant, boolean checkNumRawFiles,
+      int expNumInstants, double expTimestamp, double expROTimestamp, boolean isDeltaCommit) throws Exception {
+    checkBootstrapResults(totalRecords, schema, maxInstant, checkNumRawFiles, expNumInstants, expNumInstants,
+        expTimestamp, expROTimestamp, isDeltaCommit, Arrays.asList(maxInstant));
+  }
+
+  private void checkBootstrapResults(int totalRecords, Schema schema, String instant, boolean checkNumRawFiles,
+      int expNumInstants, int numVersions, double expTimestamp, double expROTimestamp, boolean isDeltaCommit,
+      List<String> instantsWithValidRecords) throws Exception {
+    metaClient.reloadActiveTimeline();
+    assertEquals(expNumInstants, metaClient.getCommitsTimeline().filterCompletedInstants().countInstants());
+    assertEquals(instant, metaClient.getActiveTimeline()
+        .getCommitsTimeline().filterCompletedInstants().lastInstant().get().getTimestamp());
+
+    Dataset<Row> bootstrapped = sqlContext.read().format("parquet").load(basePath);
+    Dataset<Row> original = sqlContext.read().format("parquet").load(srcPath);
+    bootstrapped.registerTempTable("bootstrapped");
+    original.registerTempTable("original");
+    if (checkNumRawFiles) {
+      List<HoodieFileStatus> files = FSUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), srcPath,
+          (status) -> status.getName().endsWith(".parquet"))
+          .stream().flatMap(x -> x.getValue().stream()).collect(Collectors.toList());
+      assertEquals(files.size() * numVersions,
+          sqlContext.sql("select distinct _hoodie_file_name from bootstrapped").count());
+    }
+
+    if (!isDeltaCommit) {
+      String predicate = String.join(", ",
+          instantsWithValidRecords.stream().map(p -> "\"" + p + "\"").collect(Collectors.toList()));
+      assertEquals(totalRecords, sqlContext.sql("select * from bootstrapped where _hoodie_commit_time IN "
+          + "(" + predicate + ")").count());
+      Dataset<Row> missingOriginal = sqlContext.sql("select a._row_key from original a where a._row_key not "
+          + "in (select _hoodie_record_key from bootstrapped)");
+      assertEquals(0, missingOriginal.count());
+      Dataset<Row> missingBootstrapped = sqlContext.sql("select a._hoodie_record_key from bootstrapped a "
+          + "where a._hoodie_record_key not in (select _row_key from original)");
+      assertEquals(0, missingBootstrapped.count());
+      //sqlContext.sql("select * from bootstrapped").show(10, false);
+    }
+
+    // RO Input Format Read
+    reloadInputFormats();
+    List<GenericRecord> records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>());
+    assertEquals(totalRecords, records.size());
+    Set<String> seenKeys = new HashSet<>();
+    for (GenericRecord r : records) {
+      assertEquals(r.get("_row_key").toString(), r.get("_hoodie_record_key").toString(), "Record :" + r);
+      assertEquals(expROTimestamp, ((DoubleWritable)r.get("timestamp")).get(), 0.1, "Record :" + r);
+      assertFalse(seenKeys.contains(r.get("_hoodie_record_key").toString()));
+      seenKeys.add(r.get("_hoodie_record_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+
+    //RT Input Format Read
+    reloadInputFormats();
+    seenKeys = new HashSet<>();
+    records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, rtJobConf, true, schema,  TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>());
+    assertEquals(totalRecords, records.size());
+    for (GenericRecord r : records) {
+      assertEquals(r.get("_row_key").toString(), r.get("_hoodie_record_key").toString(), "Realtime Record :" + r);
+      assertEquals(expTimestamp, ((DoubleWritable)r.get("timestamp")).get(),0.1, "Realtime Record :" + r);
+      assertFalse(seenKeys.contains(r.get("_hoodie_record_key").toString()));
+      seenKeys.add(r.get("_hoodie_record_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+
+    // RO Input Format Read - Project only Hoodie Columns
+    reloadInputFormats();
+    records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES,
+        true, HoodieRecord.HOODIE_META_COLUMNS);
+    assertEquals(totalRecords, records.size());
+    seenKeys = new HashSet<>();
+    for (GenericRecord r : records) {
+      assertFalse(seenKeys.contains(r.get("_hoodie_record_key").toString()));
+      seenKeys.add(r.get("_hoodie_record_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+
+    //RT Input Format Read - Project only Hoodie Columns
+    reloadInputFormats();
+    seenKeys = new HashSet<>();
+    records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, rtJobConf, true, schema,  TRIP_HIVE_COLUMN_TYPES, true,
+        HoodieRecord.HOODIE_META_COLUMNS);
+    assertEquals(totalRecords, records.size());
+    for (GenericRecord r : records) {
+      assertFalse(seenKeys.contains(r.get("_hoodie_record_key").toString()));
+      seenKeys.add(r.get("_hoodie_record_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+
+    // RO Input Format Read - Project only non-hoodie column
+    reloadInputFormats();
+    records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, true,
+        Arrays.asList("_row_key"));
+    assertEquals(totalRecords, records.size());
+    seenKeys = new HashSet<>();
+    for (GenericRecord r : records) {
+      assertFalse(seenKeys.contains(r.get("_row_key").toString()));
+      seenKeys.add(r.get("_row_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+
+    //RT Input Format Read - Project only non-hoodie column
+    reloadInputFormats();
+    seenKeys = new HashSet<>();
+    records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, rtJobConf, true, schema,  TRIP_HIVE_COLUMN_TYPES, true,
+        Arrays.asList("_row_key"));
+    assertEquals(totalRecords, records.size());
+    for (GenericRecord r : records) {
+      assertFalse(seenKeys.contains(r.get("_row_key").toString()));
+      seenKeys.add(r.get("_row_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+  }
+
+  public static class FullTestBootstrapInputProvider extends RecordDataBootstrapInputProvider {
+
+    public FullTestBootstrapInputProvider(TypedProperties props, JavaSparkContext jsc) {
+      super(props, jsc);
+    }
+
+    @Override
+    public JavaRDD<HoodieRecord> generateInputRecordRDD(String tableName, String sourceBasePath,
+        List<Pair<String, List<HoodieFileStatus>>> partitionPaths) {
+      String filePath = FileStatusUtils.toPath(partitionPaths.stream().flatMap(p -> p.getValue().stream())
+          .findAny().get().getPath()).toString();
+      ParquetFileReader reader = null;
+      try {
+        reader = ParquetFileReader.open(jsc.hadoopConfiguration(), new Path(filePath));
+      } catch (IOException e) {
+        throw new HoodieIOException(e.getMessage(), e);
+      }
+      MessageType parquetSchema = reader.getFooter().getFileMetaData().getSchema();
+      Schema schema =  new AvroSchemaConverter().convert(parquetSchema);
+      return generateInputBatch(jsc, partitionPaths, schema);
+    }
+  }
+
+  private static JavaRDD<HoodieRecord> generateInputBatch(JavaSparkContext jsc,
+      List<Pair<String, List<HoodieFileStatus>>> partitionPaths, Schema writerSchema) {
+    List<Pair<String, Path>> fullFilePathsWithPartition = partitionPaths.stream().flatMap(p -> p.getValue().stream()
+        .map(x -> Pair.of(p.getKey(), FileStatusUtils.toPath(x.getPath())))).collect(Collectors.toList());
+    return jsc.parallelize(fullFilePathsWithPartition.stream().flatMap(p -> {
+      try {
+        Configuration conf = jsc.hadoopConfiguration();
+        AvroReadSupport.setAvroReadSchema(conf, writerSchema);
+        Iterator<GenericRecord> recIterator = new ParquetReaderIterator(
+            AvroParquetReader.<GenericRecord>builder(p.getValue()).withConf(conf).build());
+        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(recIterator, 0), false).map(gr -> {
+          try {
+            String key = gr.get("_row_key").toString();
+            String pPath = p.getKey();
+            return new HoodieRecord<>(new HoodieKey(key, pPath), new RawTripTestPayload(gr.toString(), key, pPath,
+                HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA));
+          } catch (IOException e) {
+            throw new HoodieIOException(e.getMessage(), e);
+          }
+        });
+      } catch (IOException ioe) {
+        throw new HoodieIOException(ioe.getMessage(), ioe);
+      }
+    }).collect(Collectors.toList()));
+  }
+
+  public static class TestRandomBootstapModeSelector extends BootstrapModeSelector {
+
+    private int currIdx = new Random().nextInt(2);
+
+    public TestRandomBootstapModeSelector(HoodieWriteConfig writeConfig) {
+      super(writeConfig);
+    }
+
+    @Override
+    public Map<BootstrapMode, List<String>> select(List<Pair<String, List<HoodieFileStatus>>> partitions) {
+      List<Pair<BootstrapMode, String>> selections = new ArrayList<>();
+      partitions.stream().forEach(p -> {
+        final BootstrapMode mode;
+        if (currIdx == 0) {
+          mode = BootstrapMode.RECORD_METADATA_ONLY_BOOTSTRAP;
+        } else {
+          mode = BootstrapMode.RECORD_DATA_BOOTSTRAP;
+        }
+        currIdx = (currIdx + 1) % 2;
+        selections.add(Pair.of(mode, p.getKey()));
+      });
+      return selections.stream().collect(Collectors.groupingBy(Pair::getKey, mapping(Pair::getValue, toList())));
+    }
+  }
+
+  public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
+    HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr, IndexType.BLOOM)
+        .withExternalSchemaTrasformation(true);
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
+    properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "datestr");
+    builder = builder.withProps(properties);
+    return builder;
+  }
+
+  private static Dataset<Row> generateTestRawTripDataset(double timestamp, int numRecords, List<String> partitionPaths,

Review comment:
       Can you move this to a more common class, or make it public ? I am using this method to generate test data for all my test cases https://github.com/apache/hudi/pull/1702/files#diff-1ba0a227dedf7dfe4aa1b666df01f918




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1876: [HUDI-242] Support for RFC-12/Bootstrapping of external datasets

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1876:
URL: https://github.com/apache/hudi/pull/1876#discussion_r464116537



##########
File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.client.bootstrap.BootstrapMode;
+import org.apache.hudi.client.bootstrap.selector.RecordMetadataOnlyBootstrapModeSelector;
+import org.apache.hudi.client.bootstrap.translator.IdentityBootstrapPathTranslator;
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Bootstrap specific configs.
+ */
+public class HoodieBootstrapConfig extends DefaultHoodieConfig {
+
+  public static final String SOURCE_BASE_PATH_PROP = "hoodie.bootstrap.base.path";
+  public static final String BOOTSTRAP_MODE_SELECTOR = "hoodie.bootstrap.mode.selector";
+  public static final String FULL_BOOTRAP_INPUT_PROVIDER = "hoodie.bootstrap.full.input.provider";

Review comment:
       done 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] umehrot2 commented on a change in pull request #1876: [HUDI-242] Support for RFC-12/Bootstrapping of external datasets

Posted by GitBox <gi...@apache.org>.
umehrot2 commented on a change in pull request #1876:
URL: https://github.com/apache/hudi/pull/1876#discussion_r463889083



##########
File path: hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapSourceSchemaProvider.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.bootstrap;
+
+import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.common.bootstrap.FileStatusUtils;
+import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
+
+import java.util.List;
+
+/**
+ * Bootstrap Schema Provider. Schema provided in config is used. If not available, use schema from Parquet
+ */
+public class BootstrapSourceSchemaProvider {
+
+  protected final HoodieWriteConfig writeConfig;
+
+  public BootstrapSourceSchemaProvider(HoodieWriteConfig writeConfig) {
+    this.writeConfig = writeConfig;
+  }
+
+  /**
+   * Main API to select avro schema for bootstrapping.
+   * @param jsc Java Spark Context
+   * @param partitions  List of partitions with files within them
+   * @return Avro Schema
+   */
+  public final Schema getBootstrapSchema(JavaSparkContext jsc, List<Pair<String, List<HoodieFileStatus>>> partitions) {
+    if (writeConfig.getSchema() != null) {
+      // Use schema specified by user if set
+      return Schema.parse(writeConfig.getSchema());
+    }
+    return getBootstrapSourceSchema(jsc, partitions);
+  }
+
+  /**
+   * Select a random file to be used to generate avro schema.
+   * Override this method to get custom schema selection.
+   * @param jsc Java Spark Context
+   * @param partitions  List of partitions with files within them
+   * @return Avro Schema
+   */
+  protected Schema getBootstrapSourceSchema(JavaSparkContext jsc,
+      List<Pair<String, List<HoodieFileStatus>>> partitions) {
+    return partitions.stream().flatMap(p -> p.getValue().stream())
+        .map(fs -> {
+          try {
+            Path filePath = FileStatusUtils.toPath(fs.getPath());
+            return ParquetUtils.readAvroSchema(jsc.hadoopConfiguration(), filePath);

Review comment:
       @vinothchandar I think we should move this to `hudi-spark` module. You had this comment on my pull request: https://github.com/apache/hudi/pull/1702/files#r444571139 . Please check out the changes I have done in this class.
   
   Basically we were running into some schema compatibility issues, in particular with the `RecordDataBootstrapProvider`. The issue was happening because here we are reading `avro schema` using `parquet utils` whereas when we will later perform an upsert etc. then we will get `avro schema` via the regular path where we use `spark-avro` to convert `spark schema` to `avro schema`. The `avro schema` obtained from these two different approaches has compatibility issues.
   
   Thus to maintain compatibility what I had to do is:
   - read parquet schema
   - using spark's parquet-spark schema convertor convert it to `spark schema`
   - using spark-avro convert `spark schema` to `avro schema`
   
   That is why I had to introduce `spark-avro` in `hudi-client`. If you agree with the above suggestion, and do not want `spark-avro` to be added to `hudi-client` then I would suggest moving this class to `hudi-spark`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] zhedoubushishi commented on a change in pull request #1876: [HUDI-242] Support for RFC-12/Bootstrapping of external datasets

Posted by GitBox <gi...@apache.org>.
zhedoubushishi commented on a change in pull request #1876:
URL: https://github.com/apache/hudi/pull/1876#discussion_r462526195



##########
File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.client.bootstrap.BootstrapMode;
+import org.apache.hudi.client.bootstrap.selector.RecordMetadataOnlyBootstrapModeSelector;
+import org.apache.hudi.client.bootstrap.translator.IdentityBootstrapPathTranslator;
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Bootstrap specific configs.
+ */
+public class HoodieBootstrapConfig extends DefaultHoodieConfig {
+
+  public static final String SOURCE_BASE_PATH_PROP = "hoodie.bootstrap.base.path";
+  public static final String BOOTSTRAP_MODE_SELECTOR = "hoodie.bootstrap.mode.selector";
+  public static final String FULL_BOOTRAP_INPUT_PROVIDER = "hoodie.bootstrap.full.input.provider";

Review comment:
       [typo] FULL_BOOTRAP_INPUT_PROVIDER -> FULL_BOOTSTRAP_INPUT_PROVIDER




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar merged pull request #1876: [HUDI-242] Support for RFC-12/Bootstrapping of external datasets

Posted by GitBox <gi...@apache.org>.
vinothchandar merged pull request #1876:
URL: https://github.com/apache/hudi/pull/1876


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1876: [HUDI-242] Support for RFC-12/Bootstrapping of external datasets

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1876:
URL: https://github.com/apache/hudi/pull/1876#issuecomment-668358985


   @umehrot2 you can rebase now !


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on pull request #1876: [HUDI-242] Support for RFC-12/Bootstrapping of external datasets

Posted by GitBox <gi...@apache.org>.
yanghua commented on pull request #1876:
URL: https://github.com/apache/hudi/pull/1876#issuecomment-665071390


   @vinothchandar conflicts...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1876: [HUDI-242] Support for RFC-12/Bootstrapping of external datasets

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1876:
URL: https://github.com/apache/hudi/pull/1876#discussion_r464679020



##########
File path: hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
##########
@@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.client.bootstrap.BootstrapMode;
+import org.apache.hudi.client.bootstrap.RecordDataBootstrapInputProvider;
+import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
+import org.apache.hudi.client.bootstrap.selector.RecordDataBootstrapModeSelector;
+import org.apache.hudi.client.bootstrap.selector.RecordMetadataOnlyBootstrapModeSelector;
+import org.apache.hudi.common.bootstrap.FileStatusUtils;
+import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.testutils.RawTripTestPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieBootstrapConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hudi.index.HoodieIndex.IndexType;
+import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
+import org.apache.hudi.keygen.SimpleKeyGenerator;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.schema.MessageType;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.api.java.UDF1;
+import org.apache.spark.sql.types.DataTypes;
+
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.Random;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.generateGenericRecord;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.apache.spark.sql.functions.callUDF;
+
+/**
+ * Tests Bootstrap Client functionality.
+ */
+public class TestBootstrap extends HoodieClientTestBase {
+
+  //FIXME(bootstrap): why is this test so darn slow?
+
+  public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,"
+      + "struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";
+
+  @TempDir
+  public java.nio.file.Path tmpFolder;
+
+  protected String srcPath = null;
+
+  private HoodieParquetInputFormat roInputFormat;
+  private JobConf roJobConf;
+
+  private HoodieParquetRealtimeInputFormat rtInputFormat;
+  private JobConf rtJobConf;
+  private SparkSession spark;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    srcPath = tmpFolder.toAbsolutePath().toString() + "/data";
+    initPath();
+    spark = SparkSession.builder()
+        .appName("Bootstrap test")
+        .master("local[2]")
+        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+        .getOrCreate();
+    jsc = new JavaSparkContext(spark.sparkContext());
+    sqlContext = spark.sqlContext();
+    hadoopConf = spark.sparkContext().hadoopConfiguration();
+    initTestDataGenerator();
+    initMetaClient();
+    // initialize parquet input format
+    reloadInputFormats();
+  }
+
+  @AfterEach
+  public void tearDown() throws IOException {
+    cleanupClients();
+    cleanupTestDataGenerator();
+  }
+
+  private void reloadInputFormats() {
+    // initialize parquet input format
+    roInputFormat = new HoodieParquetInputFormat();
+    roJobConf = new JobConf(jsc.hadoopConfiguration());
+    roInputFormat.setConf(roJobConf);
+
+    rtInputFormat = new HoodieParquetRealtimeInputFormat();
+    rtJobConf = new JobConf(jsc.hadoopConfiguration());
+    rtInputFormat.setConf(rtJobConf);
+  }
+
+  public Schema generateNewDataSetAndReturnSchema(double timestamp, int numRecords, List<String> partitionPaths,
+      String srcPath) throws Exception {
+    boolean isPartitioned = partitionPaths != null && !partitionPaths.isEmpty();
+    Dataset<Row> df = generateTestRawTripDataset(timestamp, numRecords, partitionPaths, jsc, sqlContext);
+    df.printSchema();
+    if (isPartitioned) {
+      df.write().partitionBy("datestr").format("parquet").mode(SaveMode.Overwrite).save(srcPath);
+    } else {
+      df.write().format("parquet").mode(SaveMode.Overwrite).save(srcPath);
+    }
+    String filePath = FileStatusUtils.toPath(FSUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), srcPath,
+        (status) -> status.getName().endsWith(".parquet")).stream().findAny().map(p -> p.getValue().stream().findAny())
+        .orElse(null).get().getPath()).toString();
+    ParquetFileReader reader = ParquetFileReader.open(metaClient.getHadoopConf(), new Path(filePath));
+    MessageType schema = reader.getFooter().getFileMetaData().getSchema();
+    return new AvroSchemaConverter().convert(schema);
+  }
+
+  @Test
+  public void testMetadataBootstrapUnpartitionedCOW() throws Exception {
+    testBootstrapCommon(false, false, EffectiveMode.METADATA_BOOTSTRAP_MODE);
+  }
+
+  @Test
+  public void testMetadataBootstrapWithUpdatesCOW() throws Exception {
+    testBootstrapCommon(true, false, EffectiveMode.METADATA_BOOTSTRAP_MODE);
+  }
+
+  private enum EffectiveMode {
+    FULL_BOOTSTRAP_MODE,
+    METADATA_BOOTSTRAP_MODE,
+    MIXED_BOOTSTRAP_MODE
+  }
+
+  private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, EffectiveMode mode) throws Exception {
+    if (deltaCommit) {
+      metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ);
+    }
+    int totalRecords = 100;
+    String keyGeneratorClass = partitioned ? SimpleKeyGenerator.class.getCanonicalName()
+        : NonpartitionedKeyGenerator.class.getCanonicalName();
+    final String bootstrapModeSelectorClass;
+    final String bootstrapCommitInstantTs;
+    final boolean checkNumRawFiles;
+    final boolean isBootstrapIndexCreated;
+    final int numInstantsAfterBootstrap;
+    final List<String> bootstrapInstants;
+    switch (mode) {
+      case FULL_BOOTSTRAP_MODE:
+        bootstrapModeSelectorClass = RecordDataBootstrapModeSelector.class.getCanonicalName();
+        bootstrapCommitInstantTs = HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS;
+        checkNumRawFiles = false;
+        isBootstrapIndexCreated = false;
+        numInstantsAfterBootstrap = 1;
+        bootstrapInstants = Arrays.asList(bootstrapCommitInstantTs);
+        break;
+      case METADATA_BOOTSTRAP_MODE:
+        bootstrapModeSelectorClass = RecordMetadataOnlyBootstrapModeSelector.class.getCanonicalName();
+        bootstrapCommitInstantTs = HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS;
+        checkNumRawFiles = true;
+        isBootstrapIndexCreated = true;
+        numInstantsAfterBootstrap = 1;
+        bootstrapInstants = Arrays.asList(bootstrapCommitInstantTs);
+        break;
+      default:
+        bootstrapModeSelectorClass = TestRandomBootstapModeSelector.class.getName();
+        bootstrapCommitInstantTs = HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS;
+        checkNumRawFiles = false;
+        isBootstrapIndexCreated = true;
+        numInstantsAfterBootstrap = 2;
+        bootstrapInstants = Arrays.asList(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
+            HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS);
+        break;
+    }
+    List<String> partitions = Arrays.asList("2020/04/01", "2020/04/02", "2020/04/03");
+    double timestamp = new Double(Instant.now().toEpochMilli()).longValue();
+    Schema schema = generateNewDataSetAndReturnSchema(timestamp, totalRecords, partitions, srcPath);
+    HoodieWriteConfig config = getConfigBuilder(schema.toString())
+        .withAutoCommit(true)
+        .withSchema(schema.toString())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(1)
+            .build())
+        .withBootstrapConfig(HoodieBootstrapConfig.newBuilder()
+            .withBootstrapSourceBasePath(srcPath)
+            .withBootstrapKeyGenClass(keyGeneratorClass)
+            .withFullBootstrapInputProvider(FullTestBootstrapInputProvider.class.getName())
+            .withBootstrapParallelism(3)
+            .withBootstrapModeSelector(bootstrapModeSelectorClass).build())
+        .build();
+    HoodieWriteClient client = new HoodieWriteClient(jsc, config);
+    client.bootstrap(Option.empty());
+    checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap,
+        numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants);
+
+    // Rollback Bootstrap
+    FSUtils.deleteInstantFile(metaClient.getFs(), metaClient.getMetaPath(), new HoodieInstant(State.COMPLETED,
+        deltaCommit ? HoodieTimeline.DELTA_COMMIT_ACTION : HoodieTimeline.COMMIT_ACTION, bootstrapCommitInstantTs));
+    client.rollBackInflightBootstrap();
+    metaClient.reloadActiveTimeline();
+    assertEquals(0, metaClient.getCommitsTimeline().countInstants());
+    assertEquals(0L, FSUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), basePath,
+        (status) -> status.getName().endsWith(".parquet")).stream().flatMap(f -> f.getValue().stream()).count());
+
+    BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
+    assertFalse(index.isIndexAvailable());
+
+    // Run bootstrap again
+    client = new HoodieWriteClient(jsc, config);
+    client.bootstrap(Option.empty());
+
+    metaClient.reloadActiveTimeline();
+    index = BootstrapIndex.getBootstrapIndex(metaClient);
+    if (isBootstrapIndexCreated) {
+      assertTrue(index.isIndexAvailable());
+    } else {
+      assertFalse(index.isIndexAvailable());
+    }
+
+    checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap,
+        numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants);
+
+    // Upsert case
+    double updateTimestamp = new Double(Instant.now().toEpochMilli()).longValue();
+    String updateSPath = tmpFolder.toAbsolutePath().toString() + "/data2";
+    generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords, partitions, updateSPath);
+    JavaRDD<HoodieRecord> updateBatch =
+        generateInputBatch(jsc, FSUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), updateSPath,
+            (status) -> status.getName().endsWith("parquet")), schema);
+    String newInstantTs = client.startCommit();
+    client.upsert(updateBatch, newInstantTs);
+    checkBootstrapResults(totalRecords, schema, newInstantTs, false, numInstantsAfterBootstrap + 1,
+        updateTimestamp, deltaCommit ? timestamp : updateTimestamp, deltaCommit);
+
+    if (deltaCommit) {
+      Option<String> compactionInstant = client.scheduleCompaction(Option.empty());
+      assertTrue(compactionInstant.isPresent());
+      client.compact(compactionInstant.get());
+      checkBootstrapResults(totalRecords, schema, compactionInstant.get(), checkNumRawFiles,
+          numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp, !deltaCommit,
+          Arrays.asList(compactionInstant.get()));
+    }
+  }
+
+  @Test
+  public void testMetadataBootstrapWithUpdatesMOR() throws Exception {
+    testBootstrapCommon(true, true, EffectiveMode.METADATA_BOOTSTRAP_MODE);
+  }
+
+  @Test
+  public void testFullBoostrapOnlyCOW() throws Exception {
+    testBootstrapCommon(true, false, EffectiveMode.FULL_BOOTSTRAP_MODE);
+  }
+
+  @Test
+  public void testFullBootstrapWithUpdatesMOR() throws Exception {
+    testBootstrapCommon(true, true, EffectiveMode.FULL_BOOTSTRAP_MODE);
+  }
+
+  @Test
+  public void testMetaAndFullBoostrapCOW() throws Exception {
+    testBootstrapCommon(true, false, EffectiveMode.MIXED_BOOTSTRAP_MODE);
+  }
+
+  @Test
+  public void testMetadataAndFullBootstrapWithUpdatesMOR() throws Exception {
+    testBootstrapCommon(true, true, EffectiveMode.MIXED_BOOTSTRAP_MODE);
+  }
+
+  private void checkBootstrapResults(int totalRecords, Schema schema, String maxInstant, boolean checkNumRawFiles,
+      int expNumInstants, double expTimestamp, double expROTimestamp, boolean isDeltaCommit) throws Exception {
+    checkBootstrapResults(totalRecords, schema, maxInstant, checkNumRawFiles, expNumInstants, expNumInstants,
+        expTimestamp, expROTimestamp, isDeltaCommit, Arrays.asList(maxInstant));
+  }
+
+  private void checkBootstrapResults(int totalRecords, Schema schema, String instant, boolean checkNumRawFiles,
+      int expNumInstants, int numVersions, double expTimestamp, double expROTimestamp, boolean isDeltaCommit,
+      List<String> instantsWithValidRecords) throws Exception {
+    metaClient.reloadActiveTimeline();
+    assertEquals(expNumInstants, metaClient.getCommitsTimeline().filterCompletedInstants().countInstants());
+    assertEquals(instant, metaClient.getActiveTimeline()
+        .getCommitsTimeline().filterCompletedInstants().lastInstant().get().getTimestamp());
+
+    Dataset<Row> bootstrapped = sqlContext.read().format("parquet").load(basePath);
+    Dataset<Row> original = sqlContext.read().format("parquet").load(srcPath);
+    bootstrapped.registerTempTable("bootstrapped");
+    original.registerTempTable("original");
+    if (checkNumRawFiles) {
+      List<HoodieFileStatus> files = FSUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), srcPath,
+          (status) -> status.getName().endsWith(".parquet"))
+          .stream().flatMap(x -> x.getValue().stream()).collect(Collectors.toList());
+      assertEquals(files.size() * numVersions,
+          sqlContext.sql("select distinct _hoodie_file_name from bootstrapped").count());
+    }
+
+    if (!isDeltaCommit) {
+      String predicate = String.join(", ",
+          instantsWithValidRecords.stream().map(p -> "\"" + p + "\"").collect(Collectors.toList()));
+      assertEquals(totalRecords, sqlContext.sql("select * from bootstrapped where _hoodie_commit_time IN "
+          + "(" + predicate + ")").count());
+      Dataset<Row> missingOriginal = sqlContext.sql("select a._row_key from original a where a._row_key not "
+          + "in (select _hoodie_record_key from bootstrapped)");
+      assertEquals(0, missingOriginal.count());
+      Dataset<Row> missingBootstrapped = sqlContext.sql("select a._hoodie_record_key from bootstrapped a "
+          + "where a._hoodie_record_key not in (select _row_key from original)");
+      assertEquals(0, missingBootstrapped.count());
+      //sqlContext.sql("select * from bootstrapped").show(10, false);
+    }
+
+    // RO Input Format Read
+    reloadInputFormats();
+    List<GenericRecord> records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>());
+    assertEquals(totalRecords, records.size());
+    Set<String> seenKeys = new HashSet<>();
+    for (GenericRecord r : records) {
+      assertEquals(r.get("_row_key").toString(), r.get("_hoodie_record_key").toString(), "Record :" + r);
+      assertEquals(expROTimestamp, ((DoubleWritable)r.get("timestamp")).get(), 0.1, "Record :" + r);
+      assertFalse(seenKeys.contains(r.get("_hoodie_record_key").toString()));
+      seenKeys.add(r.get("_hoodie_record_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+
+    //RT Input Format Read
+    reloadInputFormats();
+    seenKeys = new HashSet<>();
+    records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, rtJobConf, true, schema,  TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>());
+    assertEquals(totalRecords, records.size());
+    for (GenericRecord r : records) {
+      assertEquals(r.get("_row_key").toString(), r.get("_hoodie_record_key").toString(), "Realtime Record :" + r);
+      assertEquals(expTimestamp, ((DoubleWritable)r.get("timestamp")).get(),0.1, "Realtime Record :" + r);
+      assertFalse(seenKeys.contains(r.get("_hoodie_record_key").toString()));
+      seenKeys.add(r.get("_hoodie_record_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+
+    // RO Input Format Read - Project only Hoodie Columns
+    reloadInputFormats();
+    records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES,
+        true, HoodieRecord.HOODIE_META_COLUMNS);
+    assertEquals(totalRecords, records.size());
+    seenKeys = new HashSet<>();
+    for (GenericRecord r : records) {
+      assertFalse(seenKeys.contains(r.get("_hoodie_record_key").toString()));
+      seenKeys.add(r.get("_hoodie_record_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+
+    //RT Input Format Read - Project only Hoodie Columns
+    reloadInputFormats();
+    seenKeys = new HashSet<>();
+    records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, rtJobConf, true, schema,  TRIP_HIVE_COLUMN_TYPES, true,
+        HoodieRecord.HOODIE_META_COLUMNS);
+    assertEquals(totalRecords, records.size());
+    for (GenericRecord r : records) {
+      assertFalse(seenKeys.contains(r.get("_hoodie_record_key").toString()));
+      seenKeys.add(r.get("_hoodie_record_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+
+    // RO Input Format Read - Project only non-hoodie column
+    reloadInputFormats();
+    records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, true,
+        Arrays.asList("_row_key"));
+    assertEquals(totalRecords, records.size());
+    seenKeys = new HashSet<>();
+    for (GenericRecord r : records) {
+      assertFalse(seenKeys.contains(r.get("_row_key").toString()));
+      seenKeys.add(r.get("_row_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+
+    //RT Input Format Read - Project only non-hoodie column
+    reloadInputFormats();
+    seenKeys = new HashSet<>();
+    records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, rtJobConf, true, schema,  TRIP_HIVE_COLUMN_TYPES, true,
+        Arrays.asList("_row_key"));
+    assertEquals(totalRecords, records.size());
+    for (GenericRecord r : records) {
+      assertFalse(seenKeys.contains(r.get("_row_key").toString()));
+      seenKeys.add(r.get("_row_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+  }
+
+  public static class FullTestBootstrapInputProvider extends RecordDataBootstrapInputProvider {
+
+    public FullTestBootstrapInputProvider(TypedProperties props, JavaSparkContext jsc) {
+      super(props, jsc);
+    }
+
+    @Override
+    public JavaRDD<HoodieRecord> generateInputRecordRDD(String tableName, String sourceBasePath,
+        List<Pair<String, List<HoodieFileStatus>>> partitionPaths) {
+      String filePath = FileStatusUtils.toPath(partitionPaths.stream().flatMap(p -> p.getValue().stream())
+          .findAny().get().getPath()).toString();
+      ParquetFileReader reader = null;
+      try {
+        reader = ParquetFileReader.open(jsc.hadoopConfiguration(), new Path(filePath));
+      } catch (IOException e) {
+        throw new HoodieIOException(e.getMessage(), e);
+      }
+      MessageType parquetSchema = reader.getFooter().getFileMetaData().getSchema();
+      Schema schema =  new AvroSchemaConverter().convert(parquetSchema);
+      return generateInputBatch(jsc, partitionPaths, schema);
+    }
+  }
+
+  private static JavaRDD<HoodieRecord> generateInputBatch(JavaSparkContext jsc,
+      List<Pair<String, List<HoodieFileStatus>>> partitionPaths, Schema writerSchema) {
+    List<Pair<String, Path>> fullFilePathsWithPartition = partitionPaths.stream().flatMap(p -> p.getValue().stream()
+        .map(x -> Pair.of(p.getKey(), FileStatusUtils.toPath(x.getPath())))).collect(Collectors.toList());
+    return jsc.parallelize(fullFilePathsWithPartition.stream().flatMap(p -> {
+      try {
+        Configuration conf = jsc.hadoopConfiguration();
+        AvroReadSupport.setAvroReadSchema(conf, writerSchema);
+        Iterator<GenericRecord> recIterator = new ParquetReaderIterator(
+            AvroParquetReader.<GenericRecord>builder(p.getValue()).withConf(conf).build());
+        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(recIterator, 0), false).map(gr -> {
+          try {
+            String key = gr.get("_row_key").toString();
+            String pPath = p.getKey();
+            return new HoodieRecord<>(new HoodieKey(key, pPath), new RawTripTestPayload(gr.toString(), key, pPath,
+                HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA));
+          } catch (IOException e) {
+            throw new HoodieIOException(e.getMessage(), e);
+          }
+        });
+      } catch (IOException ioe) {
+        throw new HoodieIOException(ioe.getMessage(), ioe);
+      }
+    }).collect(Collectors.toList()));
+  }
+
+  public static class TestRandomBootstapModeSelector extends BootstrapModeSelector {
+
+    private int currIdx = new Random().nextInt(2);
+
+    public TestRandomBootstapModeSelector(HoodieWriteConfig writeConfig) {
+      super(writeConfig);
+    }
+
+    @Override
+    public Map<BootstrapMode, List<String>> select(List<Pair<String, List<HoodieFileStatus>>> partitions) {
+      List<Pair<BootstrapMode, String>> selections = new ArrayList<>();
+      partitions.stream().forEach(p -> {
+        final BootstrapMode mode;
+        if (currIdx == 0) {
+          mode = BootstrapMode.RECORD_METADATA_ONLY_BOOTSTRAP;
+        } else {
+          mode = BootstrapMode.RECORD_DATA_BOOTSTRAP;
+        }
+        currIdx = (currIdx + 1) % 2;
+        selections.add(Pair.of(mode, p.getKey()));
+      });
+      return selections.stream().collect(Collectors.groupingBy(Pair::getKey, mapping(Pair::getValue, toList())));
+    }
+  }
+
+  public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
+    HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr, IndexType.BLOOM)
+        .withExternalSchemaTrasformation(true);
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
+    properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "datestr");
+    builder = builder.withProps(properties);
+    return builder;
+  }
+
+  private static Dataset<Row> generateTestRawTripDataset(double timestamp, int numRecords, List<String> partitionPaths,

Review comment:
       feel free to take this on, in your PR? that way you can make sure it works for you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1876: [HUDI-242] Support for RFC-12/Bootstrapping of external datasets

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1876:
URL: https://github.com/apache/hudi/pull/1876#discussion_r464678256



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -627,6 +672,128 @@ private HoodieInstant fetchNextCompactionInstant() throws InterruptedException {
   }
 
   public DeltaSyncService getDeltaSyncService() {
-    return deltaSyncService;
+    return deltaSyncService.get();
+  }
+
+  /**
+   * Performs bootstrap from a non-hudi source.
+   */
+  public static class BootstrapExecutor  implements Serializable {

Review comment:
       moved. I was mulling the same. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1876: [HUDI-242] Support for RFC-12/Bootstrapping of external datasets

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1876:
URL: https://github.com/apache/hudi/pull/1876#issuecomment-665155023


   @yanghua on it. still trying to make the tests all pass with master 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1876: [HUDI-242] Support for RFC-12/Bootstrapping of external datasets

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1876:
URL: https://github.com/apache/hudi/pull/1876#discussion_r464155639



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##########
@@ -516,4 +528,73 @@ public static Configuration registerFileSystem(Path file, Configuration conf) {
     return returnConf;
   }
 
+  /**
+   * Get the FS implementation for this table.
+   * @param path  Path String
+   * @param hadoopConf  Serializable Hadoop Configuration
+   * @param consistencyGuardConfig Consistency Guard Config
+   * @return HoodieWrapperFileSystem
+   */
+  public static HoodieWrapperFileSystem getFs(String path, SerializableConfiguration hadoopConf,
+      ConsistencyGuardConfig consistencyGuardConfig) {
+    FileSystem fileSystem = FSUtils.getFs(path, hadoopConf.newCopy());
+    //Preconditions.checkArgument(!(fileSystem instanceof HoodieWrapperFileSystem),
+    //    "File System not expected to be that of HoodieWrapperFileSystem");
+    return new HoodieWrapperFileSystem(fileSystem,
+        consistencyGuardConfig.isConsistencyCheckEnabled()
+            ? new FailSafeConsistencyGuard(fileSystem, consistencyGuardConfig)
+            : new NoOpConsistencyGuard());
+  }
+
+  /**
+   * Returns leaf folders with files under a path.
+   * @param fs  File System
+   * @param basePathStr Base Path to look for leaf folders
+   * @param filePathFilter  Filters to skip directories/paths
+   * @return list of partition paths with files under them.
+   * @throws IOException
+   */
+  public static List<Pair<String, List<HoodieFileStatus>>> getAllLeafFoldersWithFiles(FileSystem fs, String basePathStr,
+      PathFilter filePathFilter) throws IOException {

Review comment:
       Done. will move to a `BootstrapUtils`. not sure why this was in common. good call 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1876: [HUDI-242] Support for RFC-12/Bootstrapping of external datasets

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1876:
URL: https://github.com/apache/hudi/pull/1876#issuecomment-664060914


   Behind on getting the tests to pass again. Working on it 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] zhedoubushishi commented on a change in pull request #1876: [HUDI-242] Support for RFC-12/Bootstrapping of external datasets

Posted by GitBox <gi...@apache.org>.
zhedoubushishi commented on a change in pull request #1876:
URL: https://github.com/apache/hudi/pull/1876#discussion_r464620123



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -627,6 +672,128 @@ private HoodieInstant fetchNextCompactionInstant() throws InterruptedException {
   }
 
   public DeltaSyncService getDeltaSyncService() {
-    return deltaSyncService;
+    return deltaSyncService.get();
+  }
+
+  /**
+   * Performs bootstrap from a non-hudi source.
+   */
+  public static class BootstrapExecutor  implements Serializable {

Review comment:
       Do you think it's better to relocate this class as a separate class? Like [HDFSParquetImporter](https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/HDFSParquetImporter.java), [HoodieCompactor](https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] umehrot2 commented on a change in pull request #1876: [HUDI-242] Support for RFC-12/Bootstrapping of external datasets

Posted by GitBox <gi...@apache.org>.
umehrot2 commented on a change in pull request #1876:
URL: https://github.com/apache/hudi/pull/1876#discussion_r463904220



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
##########
@@ -516,4 +528,73 @@ public static Configuration registerFileSystem(Path file, Configuration conf) {
     return returnConf;
   }
 
+  /**
+   * Get the FS implementation for this table.
+   * @param path  Path String
+   * @param hadoopConf  Serializable Hadoop Configuration
+   * @param consistencyGuardConfig Consistency Guard Config
+   * @return HoodieWrapperFileSystem
+   */
+  public static HoodieWrapperFileSystem getFs(String path, SerializableConfiguration hadoopConf,
+      ConsistencyGuardConfig consistencyGuardConfig) {
+    FileSystem fileSystem = FSUtils.getFs(path, hadoopConf.newCopy());
+    //Preconditions.checkArgument(!(fileSystem instanceof HoodieWrapperFileSystem),
+    //    "File System not expected to be that of HoodieWrapperFileSystem");
+    return new HoodieWrapperFileSystem(fileSystem,
+        consistencyGuardConfig.isConsistencyCheckEnabled()
+            ? new FailSafeConsistencyGuard(fileSystem, consistencyGuardConfig)
+            : new NoOpConsistencyGuard());
+  }
+
+  /**
+   * Returns leaf folders with files under a path.
+   * @param fs  File System
+   * @param basePathStr Base Path to look for leaf folders
+   * @param filePathFilter  Filters to skip directories/paths
+   * @return list of partition paths with files under them.
+   * @throws IOException
+   */
+  public static List<Pair<String, List<HoodieFileStatus>>> getAllLeafFoldersWithFiles(FileSystem fs, String basePathStr,
+      PathFilter filePathFilter) throws IOException {

Review comment:
       Can we move this to `hudi-spark` or `hudi-client` instead which has spark as a dependency ? For https://issues.apache.org/jira/browse/HUDI-999 I am parallelizing this using spark context, and as we have discussed earlier we do not want spark dependency in `hudi-common`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] umehrot2 commented on pull request #1876: [HUDI-242] Support for RFC-12/Bootstrapping of external datasets

Posted by GitBox <gi...@apache.org>.
umehrot2 commented on pull request #1876:
URL: https://github.com/apache/hudi/pull/1876#issuecomment-665984117


   > finally @bvaradar @umehrot2 tests passing now.
   
   @vinothchandar So what is the next step ? Do we do a thorough review of this ? Also shall I start to rebase my changes on top of this ?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] umehrot2 commented on a change in pull request #1876: [HUDI-242] Support for RFC-12/Bootstrapping of external datasets

Posted by GitBox <gi...@apache.org>.
umehrot2 commented on a change in pull request #1876:
URL: https://github.com/apache/hudi/pull/1876#discussion_r464692792



##########
File path: hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
##########
@@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.client.bootstrap.BootstrapMode;
+import org.apache.hudi.client.bootstrap.RecordDataBootstrapInputProvider;
+import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
+import org.apache.hudi.client.bootstrap.selector.RecordDataBootstrapModeSelector;
+import org.apache.hudi.client.bootstrap.selector.RecordMetadataOnlyBootstrapModeSelector;
+import org.apache.hudi.common.bootstrap.FileStatusUtils;
+import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstant.State;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.testutils.RawTripTestPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieBootstrapConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.HoodieParquetInputFormat;
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hudi.index.HoodieIndex.IndexType;
+import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
+import org.apache.hudi.keygen.SimpleKeyGenerator;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+import org.apache.hudi.testutils.HoodieMergeOnReadTestUtils;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.schema.MessageType;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.api.java.UDF1;
+import org.apache.spark.sql.types.DataTypes;
+
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.Random;
+import java.util.Spliterators;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toList;
+import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.generateGenericRecord;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.apache.spark.sql.functions.callUDF;
+
+/**
+ * Tests Bootstrap Client functionality.
+ */
+public class TestBootstrap extends HoodieClientTestBase {
+
+  //FIXME(bootstrap): why is this test so darn slow?
+
+  public static final String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,"
+      + "struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";
+
+  @TempDir
+  public java.nio.file.Path tmpFolder;
+
+  protected String srcPath = null;
+
+  private HoodieParquetInputFormat roInputFormat;
+  private JobConf roJobConf;
+
+  private HoodieParquetRealtimeInputFormat rtInputFormat;
+  private JobConf rtJobConf;
+  private SparkSession spark;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    srcPath = tmpFolder.toAbsolutePath().toString() + "/data";
+    initPath();
+    spark = SparkSession.builder()
+        .appName("Bootstrap test")
+        .master("local[2]")
+        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+        .getOrCreate();
+    jsc = new JavaSparkContext(spark.sparkContext());
+    sqlContext = spark.sqlContext();
+    hadoopConf = spark.sparkContext().hadoopConfiguration();
+    initTestDataGenerator();
+    initMetaClient();
+    // initialize parquet input format
+    reloadInputFormats();
+  }
+
+  @AfterEach
+  public void tearDown() throws IOException {
+    cleanupClients();
+    cleanupTestDataGenerator();
+  }
+
+  private void reloadInputFormats() {
+    // initialize parquet input format
+    roInputFormat = new HoodieParquetInputFormat();
+    roJobConf = new JobConf(jsc.hadoopConfiguration());
+    roInputFormat.setConf(roJobConf);
+
+    rtInputFormat = new HoodieParquetRealtimeInputFormat();
+    rtJobConf = new JobConf(jsc.hadoopConfiguration());
+    rtInputFormat.setConf(rtJobConf);
+  }
+
+  public Schema generateNewDataSetAndReturnSchema(double timestamp, int numRecords, List<String> partitionPaths,
+      String srcPath) throws Exception {
+    boolean isPartitioned = partitionPaths != null && !partitionPaths.isEmpty();
+    Dataset<Row> df = generateTestRawTripDataset(timestamp, numRecords, partitionPaths, jsc, sqlContext);
+    df.printSchema();
+    if (isPartitioned) {
+      df.write().partitionBy("datestr").format("parquet").mode(SaveMode.Overwrite).save(srcPath);
+    } else {
+      df.write().format("parquet").mode(SaveMode.Overwrite).save(srcPath);
+    }
+    String filePath = FileStatusUtils.toPath(FSUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), srcPath,
+        (status) -> status.getName().endsWith(".parquet")).stream().findAny().map(p -> p.getValue().stream().findAny())
+        .orElse(null).get().getPath()).toString();
+    ParquetFileReader reader = ParquetFileReader.open(metaClient.getHadoopConf(), new Path(filePath));
+    MessageType schema = reader.getFooter().getFileMetaData().getSchema();
+    return new AvroSchemaConverter().convert(schema);
+  }
+
+  @Test
+  public void testMetadataBootstrapUnpartitionedCOW() throws Exception {
+    testBootstrapCommon(false, false, EffectiveMode.METADATA_BOOTSTRAP_MODE);
+  }
+
+  @Test
+  public void testMetadataBootstrapWithUpdatesCOW() throws Exception {
+    testBootstrapCommon(true, false, EffectiveMode.METADATA_BOOTSTRAP_MODE);
+  }
+
+  private enum EffectiveMode {
+    FULL_BOOTSTRAP_MODE,
+    METADATA_BOOTSTRAP_MODE,
+    MIXED_BOOTSTRAP_MODE
+  }
+
+  private void testBootstrapCommon(boolean partitioned, boolean deltaCommit, EffectiveMode mode) throws Exception {
+    if (deltaCommit) {
+      metaClient = HoodieTestUtils.init(basePath, HoodieTableType.MERGE_ON_READ);
+    }
+    int totalRecords = 100;
+    String keyGeneratorClass = partitioned ? SimpleKeyGenerator.class.getCanonicalName()
+        : NonpartitionedKeyGenerator.class.getCanonicalName();
+    final String bootstrapModeSelectorClass;
+    final String bootstrapCommitInstantTs;
+    final boolean checkNumRawFiles;
+    final boolean isBootstrapIndexCreated;
+    final int numInstantsAfterBootstrap;
+    final List<String> bootstrapInstants;
+    switch (mode) {
+      case FULL_BOOTSTRAP_MODE:
+        bootstrapModeSelectorClass = RecordDataBootstrapModeSelector.class.getCanonicalName();
+        bootstrapCommitInstantTs = HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS;
+        checkNumRawFiles = false;
+        isBootstrapIndexCreated = false;
+        numInstantsAfterBootstrap = 1;
+        bootstrapInstants = Arrays.asList(bootstrapCommitInstantTs);
+        break;
+      case METADATA_BOOTSTRAP_MODE:
+        bootstrapModeSelectorClass = RecordMetadataOnlyBootstrapModeSelector.class.getCanonicalName();
+        bootstrapCommitInstantTs = HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS;
+        checkNumRawFiles = true;
+        isBootstrapIndexCreated = true;
+        numInstantsAfterBootstrap = 1;
+        bootstrapInstants = Arrays.asList(bootstrapCommitInstantTs);
+        break;
+      default:
+        bootstrapModeSelectorClass = TestRandomBootstapModeSelector.class.getName();
+        bootstrapCommitInstantTs = HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS;
+        checkNumRawFiles = false;
+        isBootstrapIndexCreated = true;
+        numInstantsAfterBootstrap = 2;
+        bootstrapInstants = Arrays.asList(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
+            HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS);
+        break;
+    }
+    List<String> partitions = Arrays.asList("2020/04/01", "2020/04/02", "2020/04/03");
+    double timestamp = new Double(Instant.now().toEpochMilli()).longValue();
+    Schema schema = generateNewDataSetAndReturnSchema(timestamp, totalRecords, partitions, srcPath);
+    HoodieWriteConfig config = getConfigBuilder(schema.toString())
+        .withAutoCommit(true)
+        .withSchema(schema.toString())
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            .withMaxNumDeltaCommitsBeforeCompaction(1)
+            .build())
+        .withBootstrapConfig(HoodieBootstrapConfig.newBuilder()
+            .withBootstrapSourceBasePath(srcPath)
+            .withBootstrapKeyGenClass(keyGeneratorClass)
+            .withFullBootstrapInputProvider(FullTestBootstrapInputProvider.class.getName())
+            .withBootstrapParallelism(3)
+            .withBootstrapModeSelector(bootstrapModeSelectorClass).build())
+        .build();
+    HoodieWriteClient client = new HoodieWriteClient(jsc, config);
+    client.bootstrap(Option.empty());
+    checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap,
+        numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants);
+
+    // Rollback Bootstrap
+    FSUtils.deleteInstantFile(metaClient.getFs(), metaClient.getMetaPath(), new HoodieInstant(State.COMPLETED,
+        deltaCommit ? HoodieTimeline.DELTA_COMMIT_ACTION : HoodieTimeline.COMMIT_ACTION, bootstrapCommitInstantTs));
+    client.rollBackInflightBootstrap();
+    metaClient.reloadActiveTimeline();
+    assertEquals(0, metaClient.getCommitsTimeline().countInstants());
+    assertEquals(0L, FSUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), basePath,
+        (status) -> status.getName().endsWith(".parquet")).stream().flatMap(f -> f.getValue().stream()).count());
+
+    BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
+    assertFalse(index.isIndexAvailable());
+
+    // Run bootstrap again
+    client = new HoodieWriteClient(jsc, config);
+    client.bootstrap(Option.empty());
+
+    metaClient.reloadActiveTimeline();
+    index = BootstrapIndex.getBootstrapIndex(metaClient);
+    if (isBootstrapIndexCreated) {
+      assertTrue(index.isIndexAvailable());
+    } else {
+      assertFalse(index.isIndexAvailable());
+    }
+
+    checkBootstrapResults(totalRecords, schema, bootstrapCommitInstantTs, checkNumRawFiles, numInstantsAfterBootstrap,
+        numInstantsAfterBootstrap, timestamp, timestamp, deltaCommit, bootstrapInstants);
+
+    // Upsert case
+    double updateTimestamp = new Double(Instant.now().toEpochMilli()).longValue();
+    String updateSPath = tmpFolder.toAbsolutePath().toString() + "/data2";
+    generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords, partitions, updateSPath);
+    JavaRDD<HoodieRecord> updateBatch =
+        generateInputBatch(jsc, FSUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), updateSPath,
+            (status) -> status.getName().endsWith("parquet")), schema);
+    String newInstantTs = client.startCommit();
+    client.upsert(updateBatch, newInstantTs);
+    checkBootstrapResults(totalRecords, schema, newInstantTs, false, numInstantsAfterBootstrap + 1,
+        updateTimestamp, deltaCommit ? timestamp : updateTimestamp, deltaCommit);
+
+    if (deltaCommit) {
+      Option<String> compactionInstant = client.scheduleCompaction(Option.empty());
+      assertTrue(compactionInstant.isPresent());
+      client.compact(compactionInstant.get());
+      checkBootstrapResults(totalRecords, schema, compactionInstant.get(), checkNumRawFiles,
+          numInstantsAfterBootstrap + 2, 2, updateTimestamp, updateTimestamp, !deltaCommit,
+          Arrays.asList(compactionInstant.get()));
+    }
+  }
+
+  @Test
+  public void testMetadataBootstrapWithUpdatesMOR() throws Exception {
+    testBootstrapCommon(true, true, EffectiveMode.METADATA_BOOTSTRAP_MODE);
+  }
+
+  @Test
+  public void testFullBoostrapOnlyCOW() throws Exception {
+    testBootstrapCommon(true, false, EffectiveMode.FULL_BOOTSTRAP_MODE);
+  }
+
+  @Test
+  public void testFullBootstrapWithUpdatesMOR() throws Exception {
+    testBootstrapCommon(true, true, EffectiveMode.FULL_BOOTSTRAP_MODE);
+  }
+
+  @Test
+  public void testMetaAndFullBoostrapCOW() throws Exception {
+    testBootstrapCommon(true, false, EffectiveMode.MIXED_BOOTSTRAP_MODE);
+  }
+
+  @Test
+  public void testMetadataAndFullBootstrapWithUpdatesMOR() throws Exception {
+    testBootstrapCommon(true, true, EffectiveMode.MIXED_BOOTSTRAP_MODE);
+  }
+
+  private void checkBootstrapResults(int totalRecords, Schema schema, String maxInstant, boolean checkNumRawFiles,
+      int expNumInstants, double expTimestamp, double expROTimestamp, boolean isDeltaCommit) throws Exception {
+    checkBootstrapResults(totalRecords, schema, maxInstant, checkNumRawFiles, expNumInstants, expNumInstants,
+        expTimestamp, expROTimestamp, isDeltaCommit, Arrays.asList(maxInstant));
+  }
+
+  private void checkBootstrapResults(int totalRecords, Schema schema, String instant, boolean checkNumRawFiles,
+      int expNumInstants, int numVersions, double expTimestamp, double expROTimestamp, boolean isDeltaCommit,
+      List<String> instantsWithValidRecords) throws Exception {
+    metaClient.reloadActiveTimeline();
+    assertEquals(expNumInstants, metaClient.getCommitsTimeline().filterCompletedInstants().countInstants());
+    assertEquals(instant, metaClient.getActiveTimeline()
+        .getCommitsTimeline().filterCompletedInstants().lastInstant().get().getTimestamp());
+
+    Dataset<Row> bootstrapped = sqlContext.read().format("parquet").load(basePath);
+    Dataset<Row> original = sqlContext.read().format("parquet").load(srcPath);
+    bootstrapped.registerTempTable("bootstrapped");
+    original.registerTempTable("original");
+    if (checkNumRawFiles) {
+      List<HoodieFileStatus> files = FSUtils.getAllLeafFoldersWithFiles(metaClient.getFs(), srcPath,
+          (status) -> status.getName().endsWith(".parquet"))
+          .stream().flatMap(x -> x.getValue().stream()).collect(Collectors.toList());
+      assertEquals(files.size() * numVersions,
+          sqlContext.sql("select distinct _hoodie_file_name from bootstrapped").count());
+    }
+
+    if (!isDeltaCommit) {
+      String predicate = String.join(", ",
+          instantsWithValidRecords.stream().map(p -> "\"" + p + "\"").collect(Collectors.toList()));
+      assertEquals(totalRecords, sqlContext.sql("select * from bootstrapped where _hoodie_commit_time IN "
+          + "(" + predicate + ")").count());
+      Dataset<Row> missingOriginal = sqlContext.sql("select a._row_key from original a where a._row_key not "
+          + "in (select _hoodie_record_key from bootstrapped)");
+      assertEquals(0, missingOriginal.count());
+      Dataset<Row> missingBootstrapped = sqlContext.sql("select a._hoodie_record_key from bootstrapped a "
+          + "where a._hoodie_record_key not in (select _row_key from original)");
+      assertEquals(0, missingBootstrapped.count());
+      //sqlContext.sql("select * from bootstrapped").show(10, false);
+    }
+
+    // RO Input Format Read
+    reloadInputFormats();
+    List<GenericRecord> records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>());
+    assertEquals(totalRecords, records.size());
+    Set<String> seenKeys = new HashSet<>();
+    for (GenericRecord r : records) {
+      assertEquals(r.get("_row_key").toString(), r.get("_hoodie_record_key").toString(), "Record :" + r);
+      assertEquals(expROTimestamp, ((DoubleWritable)r.get("timestamp")).get(), 0.1, "Record :" + r);
+      assertFalse(seenKeys.contains(r.get("_hoodie_record_key").toString()));
+      seenKeys.add(r.get("_hoodie_record_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+
+    //RT Input Format Read
+    reloadInputFormats();
+    seenKeys = new HashSet<>();
+    records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, rtJobConf, true, schema,  TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>());
+    assertEquals(totalRecords, records.size());
+    for (GenericRecord r : records) {
+      assertEquals(r.get("_row_key").toString(), r.get("_hoodie_record_key").toString(), "Realtime Record :" + r);
+      assertEquals(expTimestamp, ((DoubleWritable)r.get("timestamp")).get(),0.1, "Realtime Record :" + r);
+      assertFalse(seenKeys.contains(r.get("_hoodie_record_key").toString()));
+      seenKeys.add(r.get("_hoodie_record_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+
+    // RO Input Format Read - Project only Hoodie Columns
+    reloadInputFormats();
+    records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES,
+        true, HoodieRecord.HOODIE_META_COLUMNS);
+    assertEquals(totalRecords, records.size());
+    seenKeys = new HashSet<>();
+    for (GenericRecord r : records) {
+      assertFalse(seenKeys.contains(r.get("_hoodie_record_key").toString()));
+      seenKeys.add(r.get("_hoodie_record_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+
+    //RT Input Format Read - Project only Hoodie Columns
+    reloadInputFormats();
+    seenKeys = new HashSet<>();
+    records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, rtJobConf, true, schema,  TRIP_HIVE_COLUMN_TYPES, true,
+        HoodieRecord.HOODIE_META_COLUMNS);
+    assertEquals(totalRecords, records.size());
+    for (GenericRecord r : records) {
+      assertFalse(seenKeys.contains(r.get("_hoodie_record_key").toString()));
+      seenKeys.add(r.get("_hoodie_record_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+
+    // RO Input Format Read - Project only non-hoodie column
+    reloadInputFormats();
+    records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, true,
+        Arrays.asList("_row_key"));
+    assertEquals(totalRecords, records.size());
+    seenKeys = new HashSet<>();
+    for (GenericRecord r : records) {
+      assertFalse(seenKeys.contains(r.get("_row_key").toString()));
+      seenKeys.add(r.get("_row_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+
+    //RT Input Format Read - Project only non-hoodie column
+    reloadInputFormats();
+    seenKeys = new HashSet<>();
+    records = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
+        jsc.hadoopConfiguration(),
+        FSUtils.getAllPartitionPaths(metaClient.getFs(), basePath, false).stream()
+            .map(f -> basePath + "/" + f).collect(Collectors.toList()),
+        basePath, rtJobConf, true, schema,  TRIP_HIVE_COLUMN_TYPES, true,
+        Arrays.asList("_row_key"));
+    assertEquals(totalRecords, records.size());
+    for (GenericRecord r : records) {
+      assertFalse(seenKeys.contains(r.get("_row_key").toString()));
+      seenKeys.add(r.get("_row_key").toString());
+    }
+    assertEquals(totalRecords, seenKeys.size());
+  }
+
+  public static class FullTestBootstrapInputProvider extends RecordDataBootstrapInputProvider {
+
+    public FullTestBootstrapInputProvider(TypedProperties props, JavaSparkContext jsc) {
+      super(props, jsc);
+    }
+
+    @Override
+    public JavaRDD<HoodieRecord> generateInputRecordRDD(String tableName, String sourceBasePath,
+        List<Pair<String, List<HoodieFileStatus>>> partitionPaths) {
+      String filePath = FileStatusUtils.toPath(partitionPaths.stream().flatMap(p -> p.getValue().stream())
+          .findAny().get().getPath()).toString();
+      ParquetFileReader reader = null;
+      try {
+        reader = ParquetFileReader.open(jsc.hadoopConfiguration(), new Path(filePath));
+      } catch (IOException e) {
+        throw new HoodieIOException(e.getMessage(), e);
+      }
+      MessageType parquetSchema = reader.getFooter().getFileMetaData().getSchema();
+      Schema schema =  new AvroSchemaConverter().convert(parquetSchema);
+      return generateInputBatch(jsc, partitionPaths, schema);
+    }
+  }
+
+  private static JavaRDD<HoodieRecord> generateInputBatch(JavaSparkContext jsc,
+      List<Pair<String, List<HoodieFileStatus>>> partitionPaths, Schema writerSchema) {
+    List<Pair<String, Path>> fullFilePathsWithPartition = partitionPaths.stream().flatMap(p -> p.getValue().stream()
+        .map(x -> Pair.of(p.getKey(), FileStatusUtils.toPath(x.getPath())))).collect(Collectors.toList());
+    return jsc.parallelize(fullFilePathsWithPartition.stream().flatMap(p -> {
+      try {
+        Configuration conf = jsc.hadoopConfiguration();
+        AvroReadSupport.setAvroReadSchema(conf, writerSchema);
+        Iterator<GenericRecord> recIterator = new ParquetReaderIterator(
+            AvroParquetReader.<GenericRecord>builder(p.getValue()).withConf(conf).build());
+        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(recIterator, 0), false).map(gr -> {
+          try {
+            String key = gr.get("_row_key").toString();
+            String pPath = p.getKey();
+            return new HoodieRecord<>(new HoodieKey(key, pPath), new RawTripTestPayload(gr.toString(), key, pPath,
+                HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA));
+          } catch (IOException e) {
+            throw new HoodieIOException(e.getMessage(), e);
+          }
+        });
+      } catch (IOException ioe) {
+        throw new HoodieIOException(ioe.getMessage(), ioe);
+      }
+    }).collect(Collectors.toList()));
+  }
+
+  public static class TestRandomBootstapModeSelector extends BootstrapModeSelector {
+
+    private int currIdx = new Random().nextInt(2);
+
+    public TestRandomBootstapModeSelector(HoodieWriteConfig writeConfig) {
+      super(writeConfig);
+    }
+
+    @Override
+    public Map<BootstrapMode, List<String>> select(List<Pair<String, List<HoodieFileStatus>>> partitions) {
+      List<Pair<BootstrapMode, String>> selections = new ArrayList<>();
+      partitions.stream().forEach(p -> {
+        final BootstrapMode mode;
+        if (currIdx == 0) {
+          mode = BootstrapMode.RECORD_METADATA_ONLY_BOOTSTRAP;
+        } else {
+          mode = BootstrapMode.RECORD_DATA_BOOTSTRAP;
+        }
+        currIdx = (currIdx + 1) % 2;
+        selections.add(Pair.of(mode, p.getKey()));
+      });
+      return selections.stream().collect(Collectors.groupingBy(Pair::getKey, mapping(Pair::getValue, toList())));
+    }
+  }
+
+  public HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
+    HoodieWriteConfig.Builder builder = getConfigBuilder(schemaStr, IndexType.BLOOM)
+        .withExternalSchemaTrasformation(true);
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key");
+    properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "datestr");
+    builder = builder.withProps(properties);
+    return builder;
+  }
+
+  private static Dataset<Row> generateTestRawTripDataset(double timestamp, int numRecords, List<String> partitionPaths,

Review comment:
       No worries. I will take it up.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1876: [HUDI-242] Support for RFC-12/Bootstrapping of external datasets

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1876:
URL: https://github.com/apache/hudi/pull/1876#issuecomment-663787403


   @bvaradar @umehrot2 after many valiant efforts, finally rebased the original #1678  here. Will be working on getting the code review comments addressed and tests passing over the weekend. 
   Will be then trying to redo #1702 on top of that. 
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1876: [HUDI-242] Support for RFC-12/Bootstrapping of external datasets

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1876:
URL: https://github.com/apache/hudi/pull/1876#issuecomment-665979160


   finally @bvaradar @umehrot2 tests passing now. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org