You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/08/07 07:28:29 UTC
[hudi] branch master updated: [HUDI-69] Support Spark Datasource
for MOR table - RDD approach (#1848)
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4f74a84 [HUDI-69] Support Spark Datasource for MOR table - RDD approach (#1848)
4f74a84 is described below
commit 4f74a84607d46249e9bb6e1397246f8dc076b390
Author: Gary Li <ya...@gmail.com>
AuthorDate: Fri Aug 7 00:28:14 2020 -0700
[HUDI-69] Support Spark Datasource for MOR table - RDD approach (#1848)
- This PR implements Spark Datasource for MOR table in the RDD approach.
- Implemented SnapshotRelation
- Implemented HudiMergeOnReadRDD
- Implemented separate Iterator to handle merge and unmerge record reader.
- Added TestMORDataSource to verify this feature.
- Clean up test file name, add tests for mixed query type tests
- We can now revert the change made in DefaultSource
Co-authored-by: Vinoth Chandar <vc...@confluent.io>
---
.../hudi/testutils/HoodieClientTestBase.java | 2 +-
.../hudi/common/table/log/LogReaderUtils.java | 8 +-
.../common/testutils/HoodieTestDataGenerator.java | 15 +
.../realtime/AbstractRealtimeRecordReader.java | 9 -
.../realtime/RealtimeCompactedRecordReader.java | 2 +-
.../realtime/RealtimeUnmergedRecordReader.java | 11 +-
.../utils/HoodieRealtimeInputFormatUtils.java | 18 +-
.../utils/HoodieRealtimeRecordReaderUtils.java | 13 +
.../org/apache/hudi/AvroConversionUtils.scala | 14 +-
.../scala/org/apache/hudi/DataSourceOptions.scala | 8 +
.../main/scala/org/apache/hudi/DefaultSource.scala | 56 +--
...ptyRelation.scala => HoodieEmptyRelation.scala} | 4 +-
.../org/apache/hudi/HoodieMergeOnReadRDD.scala | 274 +++++++++++++++
...HudiSparkUtils.scala => HoodieSparkUtils.scala} | 4 +-
.../org/apache/hudi/IncrementalRelation.scala | 7 +-
.../apache/hudi/MergeOnReadSnapshotRelation.scala | 151 ++++++++
.../java/org/apache/hudi/client/TestBootstrap.java | 10 +-
...SparkUtils.scala => TestHoodieSparkUtils.scala} | 15 +-
.../apache/hudi/functional/TestCOWDataSource.scala | 197 +++++++++++
.../apache/hudi/functional/TestDataSource.scala | 337 ------------------
.../apache/hudi/functional/TestMORDataSource.scala | 391 +++++++++++++++++++++
.../hudi/functional/TestStructuredStreaming.scala | 180 ++++++++++
22 files changed, 1317 insertions(+), 409 deletions(-)
diff --git a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 667181c..203cc54 100644
--- a/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ b/hudi-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -71,7 +71,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
*/
public class HoodieClientTestBase extends HoodieClientTestHarness {
- private static final Logger LOG = LogManager.getLogger(HoodieClientTestBase.class);
+ protected static final Logger LOG = LogManager.getLogger(HoodieClientTestBase.class);
@BeforeEach
public void setUp() throws Exception {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
index 0662e68..ffc4b85 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/LogReaderUtils.java
@@ -29,9 +29,9 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
import java.io.IOException;
import java.util.List;
@@ -62,15 +62,15 @@ public class LogReaderUtils {
return writerSchema;
}
- public static Schema readLatestSchemaFromLogFiles(String basePath, List<String> deltaFilePaths, JobConf jobConf)
+ public static Schema readLatestSchemaFromLogFiles(String basePath, List<String> deltaFilePaths, Configuration config)
throws IOException {
- HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jobConf, basePath);
+ HoodieTableMetaClient metaClient = new HoodieTableMetaClient(config, basePath);
List<String> deltaPaths = deltaFilePaths.stream().map(s -> new HoodieLogFile(new Path(s)))
.sorted(HoodieLogFile.getReverseLogFileComparator()).map(s -> s.getPath().toString())
.collect(Collectors.toList());
if (deltaPaths.size() > 0) {
for (String logPath : deltaPaths) {
- FileSystem fs = FSUtils.getFs(logPath, jobConf);
+ FileSystem fs = FSUtils.getFs(logPath, config);
Schema schemaFromLogFile =
readSchemaFromLogFileInReverse(fs, metaClient.getActiveTimeline(), new Path(logPath));
if (schemaFromLogFile != null) {
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 6b6074b..90b15d0 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -634,6 +634,10 @@ public class HoodieTestDataGenerator {
return generateUniqueUpdatesStream(instantTime, n, TRIP_EXAMPLE_SCHEMA).collect(Collectors.toList());
}
+ public List<HoodieRecord> generateUniqueUpdatesAsPerSchema(String instantTime, Integer n, String schemaStr) {
+ return generateUniqueUpdatesStream(instantTime, n, schemaStr).collect(Collectors.toList());
+ }
+
/**
* Generates deduped delete of keys previously inserted, randomly distributed across the keys above.
*
@@ -745,6 +749,17 @@ public class HoodieTestDataGenerator {
return result.stream();
}
+ /**
+ * Generates deduped delete records previously inserted, randomly distributed across the keys above.
+ *
+ * @param instantTime Commit Timestamp
+ * @param n Number of unique records
+ * @return List of hoodie records for delete
+ */
+ public List<HoodieRecord> generateUniqueDeleteRecords(String instantTime, Integer n) {
+ return generateUniqueDeleteRecordStream(instantTime, n).collect(Collectors.toList());
+ }
+
public boolean deleteExistingKeyIfPresent(HoodieKey key) {
Map<Integer, KeyPartition> existingKeys = existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
Integer numExistingKeys = numKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
index 65c416c..050b91a 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.LogReaderUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.InputSplitUtils;
-import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
import org.apache.avro.Schema;
@@ -148,12 +147,4 @@ public abstract class AbstractRealtimeRecordReader {
public Schema getHiveSchema() {
return hiveSchema;
}
-
- public long getMaxCompactionMemoryInBytes() {
- // jobConf.getMemoryForMapTask() returns in MB
- return (long) Math
- .ceil(Double.parseDouble(jobConf.get(HoodieRealtimeConfig.COMPACTION_MEMORY_FRACTION_PROP,
- HoodieRealtimeConfig.DEFAULT_COMPACTION_MEMORY_FRACTION))
- * jobConf.getMemoryForMapTask() * 1024 * 1024L);
- }
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index 78925c3..042199f 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -69,7 +69,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
split.getDeltaLogPaths(),
usesCustomPayload ? getWriterSchema() : getReaderSchema(),
split.getMaxCommitTime(),
- getMaxCompactionMemoryInBytes(),
+ HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf),
Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
false,
jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE),
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
index c06bff2..76de84b 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeUnmergedRecordReader.java
@@ -63,7 +63,7 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader
* clients to consume.
*
* @param split File split
- * @param job Job Configuration
+ * @param jobConf Job Configuration
* @param realReader Parquet Reader
*/
public RealtimeUnmergedRecordReader(RealtimeSplit split, JobConf job,
@@ -72,14 +72,15 @@ class RealtimeUnmergedRecordReader extends AbstractRealtimeRecordReader
this.parquetReader = new SafeParquetRecordReaderWrapper(realReader);
// Iterator for consuming records from parquet file
this.parquetRecordsIterator = new RecordReaderValueIterator<>(this.parquetReader);
- this.executor = new BoundedInMemoryExecutor<>(getMaxCompactionMemoryInBytes(), getParallelProducers(),
+ this.executor = new BoundedInMemoryExecutor<>(
+ HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf), getParallelProducers(),
Option.empty(), x -> x, new DefaultSizeEstimator<>());
// Consumer of this record reader
this.iterator = this.executor.getQueue().iterator();
- this.logRecordScanner = new HoodieUnMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(), jobConf),
+ this.logRecordScanner = new HoodieUnMergedLogRecordScanner(FSUtils.getFs(split.getPath().toString(), this.jobConf),
split.getBasePath(), split.getDeltaLogPaths(), getReaderSchema(), split.getMaxCommitTime(),
- Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
- false, jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), record -> {
+ Boolean.parseBoolean(this.jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)),
+ false, this.jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE), record -> {
// convert Hoodie log record to Hadoop AvroWritable and buffer
GenericRecord rec = (GenericRecord) record.getData().getInsertValue(getReaderSchema()).get();
ArrayWritable aWritable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(rec, getHiveSchema());
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
index cc46d96..346d7a0 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.hadoop.utils;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -34,7 +35,6 @@ import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
import org.apache.hudi.hadoop.realtime.RealtimeBootstrapBaseFileSplit;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
@@ -119,15 +119,15 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
}
// Return parquet file with a list of log files in the same file group.
- public static Map<String, List<String>> groupLogsByBaseFile(Configuration conf, Stream<FileStatus> fileStatuses) {
- Map<Path, List<FileStatus>> partitionsToParquetSplits =
- fileStatuses.collect(Collectors.groupingBy(file -> file.getPath().getParent()));
+ public static Map<HoodieBaseFile, List<String>> groupLogsByBaseFile(Configuration conf, List<HoodieBaseFile> fileStatuses) {
+ Map<Path, List<HoodieBaseFile>> partitionsToParquetSplits =
+ fileStatuses.stream().collect(Collectors.groupingBy(file -> file.getFileStatus().getPath().getParent()));
// TODO(vc): Should we handle also non-hoodie splits here?
Map<Path, HoodieTableMetaClient> partitionsToMetaClient = getTableMetaClientByBasePath(conf, partitionsToParquetSplits.keySet());
// for all unique split parents, obtain all delta files based on delta commit timeline,
// grouped on file id
- Map<String, List<String>> resultMap = new HashMap<>();
+ Map<HoodieBaseFile, List<String>> resultMap = new HashMap<>();
partitionsToParquetSplits.keySet().forEach(partitionPath -> {
// for each partition path obtain the data & log file groupings, then map back to inputsplits
HoodieTableMetaClient metaClient = partitionsToMetaClient.get(partitionPath);
@@ -144,15 +144,15 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
.orElse(Stream.empty());
// subgroup splits again by file id & match with log files.
- Map<String, List<FileStatus>> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream()
- .collect(Collectors.groupingBy(file -> FSUtils.getFileId(file.getPath().getName())));
+ Map<String, List<HoodieBaseFile>> groupedInputSplits = partitionsToParquetSplits.get(partitionPath).stream()
+ .collect(Collectors.groupingBy(file -> FSUtils.getFileId(file.getFileStatus().getPath().getName())));
latestFileSlices.forEach(fileSlice -> {
- List<FileStatus> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
+ List<HoodieBaseFile> dataFileSplits = groupedInputSplits.get(fileSlice.getFileId());
dataFileSplits.forEach(split -> {
try {
List<String> logFilePaths = fileSlice.getLogFiles().sorted(HoodieLogFile.getLogFileComparator())
.map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
- resultMap.put(split.getPath().toString(), logFilePaths);
+ resultMap.put(split, logFilePaths);
} catch (Exception e) {
throw new HoodieException("Error creating hoodie real time split ", e);
}
diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index e925c18..871f722 100644
--- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.avro.LogicalTypes;
@@ -43,6 +44,7 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -70,6 +72,17 @@ public class HoodieRealtimeRecordReaderUtils {
}
/**
+ * get the max compaction memory in bytes from JobConf.
+ */
+ public static long getMaxCompactionMemoryInBytes(JobConf jobConf) {
+ // jobConf.getMemoryForMapTask() returns in MB
+ return (long) Math
+ .ceil(Double.parseDouble(jobConf.get(HoodieRealtimeConfig.COMPACTION_MEMORY_FRACTION_PROP,
+ HoodieRealtimeConfig.DEFAULT_COMPACTION_MEMORY_FRACTION))
+ * jobConf.getMemoryForMapTask() * 1024 * 1024L);
+ }
+
+ /**
* Prints a JSON representation of the ArrayWritable for easier debuggability.
*/
public static String arrayWritableToString(ArrayWritable writable) {
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index bdb8955..e6d6c55 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -17,7 +17,7 @@
package org.apache.hudi
-import org.apache.avro.generic.GenericRecord
+import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder, IndexedRecord}
import org.apache.hudi.common.model.HoodieKey
import org.apache.avro.Schema
import org.apache.spark.rdd.RDD
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
+import scala.collection.JavaConverters._
object AvroConversionUtils {
@@ -78,4 +79,15 @@ object AvroConversionUtils {
def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
}
+
+ def buildAvroRecordBySchema(record: IndexedRecord,
+ requiredSchema: Schema,
+ requiredPos: List[Int],
+ recordBuilder: GenericRecordBuilder): GenericRecord = {
+ val requiredFields = requiredSchema.getFields.asScala
+ assert(requiredFields.length == requiredPos.length)
+ val positionIterator = requiredPos.iterator
+ requiredFields.foreach(f => recordBuilder.set(f, record.get(positionIterator.next())))
+ recordBuilder.build()
+ }
}
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 4d94463..917bfed 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -51,6 +51,14 @@ object DataSourceReadOptions {
val QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental"
val DEFAULT_QUERY_TYPE_OPT_VAL: String = QUERY_TYPE_SNAPSHOT_OPT_VAL
+ /**
+ * For Snapshot query on merge on read table. Use this key to define the payload class.
+ */
+ val REALTIME_MERGE_OPT_KEY = "hoodie.datasource.merge.type"
+ val REALTIME_SKIP_MERGE_OPT_VAL = "skip_merge"
+ val REALTIME_PAYLOAD_COMBINE_OPT_VAL = "payload_combine"
+ val DEFAULT_REALTIME_MERGE_OPT_VAL = REALTIME_PAYLOAD_COMBINE_OPT_VAL
+
@Deprecated
val VIEW_TYPE_OPT_KEY = "hoodie.datasource.view.type"
@Deprecated
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
index e26c1c8..10be305 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -18,8 +18,9 @@
package org.apache.hudi
import org.apache.hudi.DataSourceReadOptions._
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.log4j.LogManager
@@ -60,26 +61,20 @@ class DefaultSource extends RelationProvider
throw new HoodieException("'path' must be specified.")
}
+ val fs = FSUtils.getFs(path.get, sqlContext.sparkContext.hadoopConfiguration)
+ val globPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(Seq(path.get), fs)
+ val tablePath = DataSourceUtils.getTablePath(fs, globPaths.toArray)
if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_SNAPSHOT_OPT_VAL)) {
- // this is just effectively RO view only, where `path` can contain a mix of
- // non-hoodie/hoodie path files. set the path filter up
- sqlContext.sparkContext.hadoopConfiguration.setClass(
- "mapreduce.input.pathFilter.class",
- classOf[HoodieROTablePathFilter],
- classOf[org.apache.hadoop.fs.PathFilter])
-
- log.info("Constructing hoodie (as parquet) data source with options :" + parameters)
- log.warn("Snapshot view not supported yet via data source, for MERGE_ON_READ tables. " +
- "Please query the Hive table registered using Spark SQL.")
- // simply return as a regular parquet relation
- DataSource.apply(
- sparkSession = sqlContext.sparkSession,
- userSpecifiedSchema = Option(schema),
- className = "parquet",
- options = parameters)
- .resolveRelation()
+ val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath)
+ if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
+ new MergeOnReadSnapshotRelation(sqlContext, optParams, schema, globPaths, metaClient)
+ } else {
+ getBaseFileOnlyView(sqlContext, parameters, schema)
+ }
+ } else if(parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)) {
+ getBaseFileOnlyView(sqlContext, parameters, schema)
} else if (parameters(QUERY_TYPE_OPT_KEY).equals(QUERY_TYPE_INCREMENTAL_OPT_VAL)) {
- new IncrementalRelation(sqlContext, path.get, optParams, schema)
+ new IncrementalRelation(sqlContext, tablePath, optParams, schema)
} else {
throw new HoodieException("Invalid query type :" + parameters(QUERY_TYPE_OPT_KEY))
}
@@ -107,7 +102,7 @@ class DefaultSource extends RelationProvider
df: DataFrame): BaseRelation = {
val parameters = HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams)
HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df)
- new HudiEmptyRelation(sqlContext, df.schema)
+ new HoodieEmptyRelation(sqlContext, df.schema)
}
override def createSink(sqlContext: SQLContext,
@@ -123,4 +118,25 @@ class DefaultSource extends RelationProvider
}
override def shortName(): String = "hudi"
+
+ private def getBaseFileOnlyView(sqlContext: SQLContext,
+ optParams: Map[String, String],
+ schema: StructType): BaseRelation = {
+ log.warn("Loading Base File Only View.")
+ // this is just effectively RO view only, where `path` can contain a mix of
+ // non-hoodie/hoodie path files. set the path filter up
+ sqlContext.sparkContext.hadoopConfiguration.setClass(
+ "mapreduce.input.pathFilter.class",
+ classOf[HoodieROTablePathFilter],
+ classOf[org.apache.hadoop.fs.PathFilter])
+
+ log.info("Constructing hoodie (as parquet) data source with options :" + optParams)
+ // simply return as a regular parquet relation
+ DataSource.apply(
+ sparkSession = sqlContext.sparkSession,
+ userSpecifiedSchema = Option(schema),
+ className = "parquet",
+ options = optParams)
+ .resolveRelation()
+ }
}
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HudiEmptyRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieEmptyRelation.scala
similarity index 90%
rename from hudi-spark/src/main/scala/org/apache/hudi/HudiEmptyRelation.scala
rename to hudi-spark/src/main/scala/org/apache/hudi/HoodieEmptyRelation.scala
index 8ddbe46..4642993 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HudiEmptyRelation.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieEmptyRelation.scala
@@ -28,8 +28,8 @@ import org.apache.spark.sql.types.StructType
* @param sqlContext Spark SQL Context
* @param userSchema Users data schema
*/
-class HudiEmptyRelation(val sqlContext: SQLContext,
- val userSchema: StructType) extends BaseRelation {
+class HoodieEmptyRelation(val sqlContext: SQLContext,
+ val userSchema: StructType) extends BaseRelation {
override def schema: StructType = userSchema
}
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
new file mode 100644
index 0000000..f272084
--- /dev/null
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.config.HoodieRealtimeConfig
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.avro.{AvroDeserializer, AvroSerializer}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.Try
+
+case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSplit) extends Partition
+
+class HoodieMergeOnReadRDD(@transient sc: SparkContext,
+ @transient config: Configuration,
+ fullSchemaFileReader: PartitionedFile => Iterator[Any],
+ requiredSchemaFileReader: PartitionedFile => Iterator[Any],
+ tableState: HoodieMergeOnReadTableState)
+ extends RDD[InternalRow](sc, Nil) {
+
+ private val confBroadcast = sc.broadcast(new SerializableWritable(config))
+
+ override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
+ val mergeParquetPartition = split.asInstanceOf[HoodieMergeOnReadPartition]
+ mergeParquetPartition.split match {
+ case dataFileOnlySplit if dataFileOnlySplit.logPaths.isEmpty =>
+ read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader)
+ case skipMergeSplit if skipMergeSplit.mergeType
+ .equals(DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL) =>
+ skipMergeFileIterator(
+ skipMergeSplit,
+ read(mergeParquetPartition.split.dataFile, requiredSchemaFileReader),
+ getConfig
+ )
+ case payloadCombineSplit if payloadCombineSplit.mergeType
+ .equals(DataSourceReadOptions.REALTIME_PAYLOAD_COMBINE_OPT_VAL) =>
+ payloadCombineFileIterator(
+ payloadCombineSplit,
+ read(mergeParquetPartition.split.dataFile, fullSchemaFileReader),
+ getConfig
+ )
+ case _ => throw new HoodieException(s"Unable to select an Iterator to read the Hoodie MOR File Split for " +
+ s"file path: ${mergeParquetPartition.split.dataFile.filePath}" +
+ s"log paths: ${mergeParquetPartition.split.logPaths.toString}" +
+ s"hoodie table path: ${mergeParquetPartition.split.tablePath}" +
+ s"spark partition Index: ${mergeParquetPartition.index}" +
+ s"merge type: ${mergeParquetPartition.split.mergeType}")
+ }
+ }
+
+ override protected def getPartitions: Array[Partition] = {
+ tableState
+ .hoodieRealtimeFileSplits
+ .zipWithIndex
+ .map(file => HoodieMergeOnReadPartition(file._2, file._1)).toArray
+ }
+
+ private def getConfig: Configuration = {
+ val conf = confBroadcast.value.value
+ HoodieMergeOnReadRDD.CONFIG_INSTANTIATION_LOCK.synchronized {
+ new Configuration(conf)
+ }
+ }
+
+ private def read(partitionedFile: PartitionedFile,
+ readFileFunction: PartitionedFile => Iterator[Any]): Iterator[InternalRow] = {
+ val fileIterator = readFileFunction(partitionedFile)
+ val rows = fileIterator.flatMap(_ match {
+ case r: InternalRow => Seq(r)
+ case b: ColumnarBatch => b.rowIterator().asScala
+ })
+ rows
+ }
+
+ private def skipMergeFileIterator(split: HoodieMergeOnReadFileSplit,
+ baseFileIterator: Iterator[InternalRow],
+ config: Configuration): Iterator[InternalRow] =
+ new Iterator[InternalRow] {
+ private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
+ private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
+ private val requiredFieldPosition =
+ tableState.requiredStructSchema
+ .map(f => tableAvroSchema.getField(f.name).pos()).toList
+ private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
+ private val deserializer = new AvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
+ private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
+ private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords
+ private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
+
+ private var recordToLoad: InternalRow = _
+
+ @scala.annotation.tailrec
+ override def hasNext: Boolean = {
+ if (baseFileIterator.hasNext) {
+ recordToLoad = baseFileIterator.next()
+ true
+ } else {
+ if (logRecordsKeyIterator.hasNext) {
+ val curAvrokey = logRecordsKeyIterator.next()
+ val curAvroRecord = logRecords.get(curAvrokey).getData.getInsertValue(tableAvroSchema)
+ if (!curAvroRecord.isPresent) {
+ // delete record found, skipping
+ this.hasNext
+ } else {
+ val requiredAvroRecord = AvroConversionUtils
+ .buildAvroRecordBySchema(curAvroRecord.get(), requiredAvroSchema, requiredFieldPosition, recordBuilder)
+ recordToLoad = unsafeProjection(deserializer.deserialize(requiredAvroRecord).asInstanceOf[InternalRow])
+ true
+ }
+ } else {
+ false
+ }
+ }
+ }
+
+ override def next(): InternalRow = {
+ recordToLoad
+ }
+ }
+
+ private def payloadCombineFileIterator(split: HoodieMergeOnReadFileSplit,
+ baseFileIterator: Iterator[InternalRow],
+ config: Configuration): Iterator[InternalRow] =
+ new Iterator[InternalRow] {
+ private val tableAvroSchema = new Schema.Parser().parse(tableState.tableAvroSchema)
+ private val requiredAvroSchema = new Schema.Parser().parse(tableState.requiredAvroSchema)
+ private val requiredFieldPosition =
+ tableState.requiredStructSchema
+ .map(f => tableAvroSchema.getField(f.name).pos()).toList
+ private val serializer = new AvroSerializer(tableState.tableStructSchema, tableAvroSchema, false)
+ private val requiredDeserializer = new AvroDeserializer(requiredAvroSchema, tableState.requiredStructSchema)
+ private val recordBuilder = new GenericRecordBuilder(requiredAvroSchema)
+ private val unsafeProjection = UnsafeProjection.create(tableState.requiredStructSchema)
+ private val logRecords = HoodieMergeOnReadRDD.scanLog(split, tableAvroSchema, config).getRecords
+ private val logRecordsKeyIterator = logRecords.keySet().iterator().asScala
+ private val keyToSkip = mutable.Set.empty[String]
+
+ private var recordToLoad: InternalRow = _
+
+ @scala.annotation.tailrec
+ override def hasNext: Boolean = {
+ if (baseFileIterator.hasNext) {
+ val curRow = baseFileIterator.next()
+ val curKey = curRow.getString(HOODIE_RECORD_KEY_COL_POS)
+ if (logRecords.containsKey(curKey)) {
+ // duplicate key found, merging
+ keyToSkip.add(curKey)
+ val mergedAvroRecord = mergeRowWithLog(curRow, curKey)
+ if (!mergedAvroRecord.isPresent) {
+ // deleted
+ this.hasNext
+ } else {
+ // load merged record as InternalRow with required schema
+ val requiredAvroRecord = AvroConversionUtils
+ .buildAvroRecordBySchema(
+ mergedAvroRecord.get(),
+ requiredAvroSchema,
+ requiredFieldPosition,
+ recordBuilder
+ )
+ recordToLoad = unsafeProjection(requiredDeserializer
+ .deserialize(requiredAvroRecord).asInstanceOf[InternalRow])
+ true
+ }
+ } else {
+ // No merge needed, load current row with required schema
+ recordToLoad = unsafeProjection(createRowWithRequiredSchema(curRow))
+ true
+ }
+ } else {
+ if (logRecordsKeyIterator.hasNext) {
+ val curKey = logRecordsKeyIterator.next()
+ if (keyToSkip.contains(curKey)) {
+ this.hasNext
+ } else {
+ val insertAvroRecord =
+ logRecords.get(curKey).getData.getInsertValue(tableAvroSchema)
+ if (!insertAvroRecord.isPresent) {
+ // stand alone delete record, skipping
+ this.hasNext
+ } else {
+ val requiredAvroRecord = AvroConversionUtils
+ .buildAvroRecordBySchema(
+ insertAvroRecord.get(),
+ requiredAvroSchema,
+ requiredFieldPosition,
+ recordBuilder
+ )
+ recordToLoad = unsafeProjection(requiredDeserializer
+ .deserialize(requiredAvroRecord).asInstanceOf[InternalRow])
+ true
+ }
+ }
+ } else {
+ false
+ }
+ }
+ }
+
+ override def next(): InternalRow = recordToLoad
+
+ private def createRowWithRequiredSchema(row: InternalRow): InternalRow = {
+ val rowToReturn = new SpecificInternalRow(tableState.requiredStructSchema)
+ val posIterator = requiredFieldPosition.iterator
+ var curIndex = 0
+ tableState.requiredStructSchema.foreach(
+ f => {
+ val curPos = posIterator.next()
+ val curField = row.get(curPos, f.dataType)
+ rowToReturn.update(curIndex, curField)
+ curIndex = curIndex + 1
+ }
+ )
+ rowToReturn
+ }
+
+ private def mergeRowWithLog(curRow: InternalRow, curKey: String) = {
+ val historyAvroRecord = serializer.serialize(curRow).asInstanceOf[GenericRecord]
+ logRecords.get(curKey).getData.combineAndGetUpdateValue(historyAvroRecord, tableAvroSchema)
+ }
+ }
+}
+
+private object HoodieMergeOnReadRDD {
+ val CONFIG_INSTANTIATION_LOCK = new Object()
+
+ def scanLog(split: HoodieMergeOnReadFileSplit, logSchema: Schema, config: Configuration): HoodieMergedLogRecordScanner = {
+ val fs = FSUtils.getFs(split.tablePath, config)
+ new HoodieMergedLogRecordScanner(
+ fs,
+ split.tablePath,
+ split.logPaths.get.asJava,
+ logSchema,
+ split.latestCommit,
+ split.maxCompactionMemoryInBytes,
+ Try(config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
+ HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED).toBoolean).getOrElse(false),
+ false,
+ config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
+ HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE),
+ config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
+ HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
+ }
+}
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HudiSparkUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
similarity index 96%
rename from hudi-spark/src/main/scala/org/apache/hudi/HudiSparkUtils.scala
rename to hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 861de14..26babd8 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HudiSparkUtils.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -27,9 +27,9 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
import scala.collection.JavaConverters._
-object HudiSparkUtils {
+object HoodieSparkUtils {
- def getHudiMetadataSchema: StructType = {
+ def getMetaSchema: StructType = {
StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
StructField(col, StringType, nullable = true)
}))
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
index 436895b..338a54e 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/IncrementalRelation.scala
@@ -17,14 +17,14 @@
package org.apache.hudi
-import org.apache.hadoop.fs.GlobPattern
-import org.apache.hadoop.fs.Path
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieRecord, HoodieTableType}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.table.HoodieTable
+
+import org.apache.hadoop.fs.GlobPattern
import org.apache.log4j.LogManager
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.{BaseRelation, TableScan}
@@ -47,7 +47,8 @@ class IncrementalRelation(val sqlContext: SQLContext,
private val log = LogManager.getLogger(classOf[IncrementalRelation])
- private val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true)
+ private val metaClient =
+ new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration, basePath, true)
// MOR tables not supported yet
if (metaClient.getTableType.equals(HoodieTableType.MERGE_ON_READ)) {
throw new HoodieException("Incremental view not implemented yet, for merge-on-read tables")
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala b/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
new file mode 100644
index 0000000..c1a6acd
--- /dev/null
+++ b/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.common.model.HoodieBaseFile
+import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView
+import org.apache.hudi.exception.HoodieException
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.mapred.JobConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class HoodieMergeOnReadFileSplit(dataFile: PartitionedFile,
+ logPaths: Option[List[String]],
+ latestCommit: String,
+ tablePath: String,
+ maxCompactionMemoryInBytes: Long,
+ mergeType: String)
+
+case class HoodieMergeOnReadTableState(tableStructSchema: StructType,
+ requiredStructSchema: StructType,
+ tableAvroSchema: String,
+ requiredAvroSchema: String,
+ hoodieRealtimeFileSplits: List[HoodieMergeOnReadFileSplit])
+
+class MergeOnReadSnapshotRelation(val sqlContext: SQLContext,
+ val optParams: Map[String, String],
+ val userSchema: StructType,
+ val globPaths: Seq[Path],
+ val metaClient: HoodieTableMetaClient)
+ extends BaseRelation with PrunedFilteredScan with Logging {
+
+ private val conf = sqlContext.sparkContext.hadoopConfiguration
+ private val jobConf = new JobConf(conf)
+ // use schema from latest metadata, if not present, read schema from the data file
+ private val schemaUtil = new TableSchemaResolver(metaClient)
+ private val tableAvroSchema = schemaUtil.getTableAvroSchema
+ private val tableStructSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableAvroSchema)
+ private val mergeType = optParams.getOrElse(
+ DataSourceReadOptions.REALTIME_MERGE_OPT_KEY,
+ DataSourceReadOptions.DEFAULT_REALTIME_MERGE_OPT_VAL)
+ private val maxCompactionMemoryInBytes = getMaxCompactionMemoryInBytes(jobConf)
+ private val fileIndex = buildFileIndex()
+
+ override def schema: StructType = tableStructSchema
+
+ override def needConversion: Boolean = false
+
+ override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {
+ log.debug(s" buildScan requiredColumns = ${requiredColumns.mkString(",")}")
+ log.debug(s" buildScan filters = ${filters.mkString(",")}")
+ var requiredStructSchema = StructType(Seq())
+ requiredColumns.foreach(col => {
+ val field = tableStructSchema.find(_.name == col)
+ if (field.isDefined) {
+ requiredStructSchema = requiredStructSchema.add(field.get)
+ }
+ })
+ val requiredAvroSchema = AvroConversionUtils
+ .convertStructTypeToAvroSchema(requiredStructSchema, tableAvroSchema.getName, tableAvroSchema.getNamespace)
+ val hoodieTableState = HoodieMergeOnReadTableState(
+ tableStructSchema,
+ requiredStructSchema,
+ tableAvroSchema.toString,
+ requiredAvroSchema.toString,
+ fileIndex
+ )
+ val fullSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
+ sparkSession = sqlContext.sparkSession,
+ dataSchema = tableStructSchema,
+ partitionSchema = StructType(Nil),
+ requiredSchema = tableStructSchema,
+ filters = Seq(),
+ options = optParams,
+ hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+ )
+ val requiredSchemaParquetReader = new ParquetFileFormat().buildReaderWithPartitionValues(
+ sparkSession = sqlContext.sparkSession,
+ dataSchema = tableStructSchema,
+ partitionSchema = StructType(Nil),
+ requiredSchema = requiredStructSchema,
+ filters = filters,
+ options = optParams,
+ hadoopConf = sqlContext.sparkSession.sessionState.newHadoopConf()
+ )
+
+ // Follow the implementation of Spark internal HadoopRDD to handle the broadcast configuration.
+ FileSystem.getLocal(jobConf)
+ SparkHadoopUtil.get.addCredentials(jobConf)
+ val rdd = new HoodieMergeOnReadRDD(
+ sqlContext.sparkContext,
+ jobConf,
+ fullSchemaParquetReader,
+ requiredSchemaParquetReader,
+ hoodieTableState
+ )
+ rdd.asInstanceOf[RDD[Row]]
+ }
+
+ def buildFileIndex(): List[HoodieMergeOnReadFileSplit] = {
+ val inMemoryFileIndex = HoodieSparkUtils.createInMemoryFileIndex(sqlContext.sparkSession, globPaths)
+ val fileStatuses = inMemoryFileIndex.allFiles()
+ if (fileStatuses.isEmpty) {
+ throw new HoodieException("No files found for reading in user provided path.")
+ }
+
+ val fsView = new HoodieTableFileSystemView(metaClient,
+ metaClient.getActiveTimeline.getCommitsTimeline
+ .filterCompletedInstants, fileStatuses.toArray)
+ val latestFiles: List[HoodieBaseFile] = fsView.getLatestBaseFiles.iterator().asScala.toList
+ val latestCommit = fsView.getLastInstant.get().getTimestamp
+ val fileGroup = HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(conf, latestFiles.asJava).asScala
+ val fileSplits = fileGroup.map(kv => {
+ val baseFile = kv._1
+ val logPaths = if (kv._2.isEmpty) Option.empty else Option(kv._2.asScala.toList)
+ val partitionedFile = PartitionedFile(InternalRow.empty, baseFile.getPath, 0, baseFile.getFileLen)
+ HoodieMergeOnReadFileSplit(partitionedFile, logPaths, latestCommit,
+ metaClient.getBasePath, maxCompactionMemoryInBytes, mergeType)
+ }).toList
+ fileSplits
+ }
+}
diff --git a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java b/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
index 0aa8ca4..4e1984c 100644
--- a/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
+++ b/hudi-spark/src/test/java/org/apache/hudi/client/TestBootstrap.java
@@ -130,14 +130,7 @@ public class TestBootstrap extends HoodieClientTestBase {
public void setUp() throws Exception {
bootstrapBasePath = tmpFolder.toAbsolutePath().toString() + "/data";
initPath();
- spark = SparkSession.builder()
- .appName("Bootstrap test")
- .master("local[2]")
- .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .getOrCreate();
- jsc = new JavaSparkContext(spark.sparkContext());
- sqlContext = spark.sqlContext();
- hadoopConf = spark.sparkContext().hadoopConfiguration();
+ initSparkContexts();
initTestDataGenerator();
initMetaClient();
// initialize parquet input format
@@ -146,6 +139,7 @@ public class TestBootstrap extends HoodieClientTestBase {
@AfterEach
public void tearDown() throws IOException {
+ cleanupSparkContexts();
cleanupClients();
cleanupTestDataGenerator();
}
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/TestHudiSparkUtils.scala b/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
similarity index 89%
rename from hudi-spark/src/test/scala/org/apache/hudi/TestHudiSparkUtils.scala
rename to hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
index 6b1a178..d5da676 100644
--- a/hudi-spark/src/test/scala/org/apache/hudi/TestHudiSparkUtils.scala
+++ b/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
@@ -28,7 +28,7 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.io.TempDir
-class TestHudiSparkUtils {
+class TestHoodieSparkUtils {
@Test
def testGlobPaths(@TempDir tempDir: File): Unit = {
@@ -48,29 +48,29 @@ class TestHudiSparkUtils {
files.foreach(file => new File(file.toUri).createNewFile())
var paths = Seq(tempDir.getAbsolutePath + "/*")
- var globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths,
+ var globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths,
new Path(paths.head).getFileSystem(new Configuration()))
assertEquals(folders.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString))
paths = Seq(tempDir.getAbsolutePath + "/*/*")
- globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths,
+ globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths,
new Path(paths.head).getFileSystem(new Configuration()))
assertEquals(files.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString))
paths = Seq(tempDir.getAbsolutePath + "/folder1/*")
- globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths,
+ globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths,
new Path(paths.head).getFileSystem(new Configuration()))
assertEquals(Seq(files(0), files(1)).sortWith(_.toString < _.toString),
globbedPaths.sortWith(_.toString < _.toString))
paths = Seq(tempDir.getAbsolutePath + "/folder2/*")
- globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths,
+ globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths,
new Path(paths.head).getFileSystem(new Configuration()))
assertEquals(Seq(files(2), files(3)).sortWith(_.toString < _.toString),
globbedPaths.sortWith(_.toString < _.toString))
paths = Seq(tempDir.getAbsolutePath + "/folder1/*", tempDir.getAbsolutePath + "/folder2/*")
- globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths,
+ globbedPaths = HoodieSparkUtils.checkAndGlobPathIfNecessary(paths,
new Path(paths.head).getFileSystem(new Configuration()))
assertEquals(files.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString))
}
@@ -98,8 +98,9 @@ class TestHudiSparkUtils {
folders.foreach(folder => new File(folder.toUri).mkdir())
files.foreach(file => new File(file.toUri).createNewFile())
- val index = HudiSparkUtils.createInMemoryFileIndex(spark, Seq(folders(0), folders(1)))
+ val index = HoodieSparkUtils.createInMemoryFileIndex(spark, Seq(folders(0), folders(1)))
val indexedFilePaths = index.allFiles().map(fs => fs.getPath)
assertEquals(files.sortWith(_.toString < _.toString), indexedFilePaths.sortWith(_.toString < _.toString))
+ spark.stop()
}
}
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
new file mode 100644
index 0000000..c1e45c3
--- /dev/null
+++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
+import org.apache.log4j.LogManager
+import org.apache.spark.sql._
+import org.apache.spark.sql.functions.col
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.collection.JavaConversions._
+
+/**
+ * Basic tests on the spark datasource for COW table.
+ */
+class TestCOWDataSource extends HoodieClientTestBase {
+ private val log = LogManager.getLogger(getClass)
+ var spark: SparkSession = null
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
+ DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
+ HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
+ )
+
+ @BeforeEach override def setUp() {
+ initPath()
+ initSparkContexts()
+ spark = sqlContext.sparkSession
+ initTestDataGenerator()
+ initFileSystem()
+ }
+
+ @AfterEach override def tearDown() = {
+ cleanupSparkContexts()
+ cleanupTestDataGenerator()
+ cleanupFileSystem()
+ }
+
+ @Test def testShortNameStorage() {
+ // Insert Operation
+ val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+ val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+ inputDF.write.format("hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+ }
+
+ @Test def testCopyOnWriteStorage() {
+ // Insert Operation
+ val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ inputDF1.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+ val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+
+ // Snapshot query
+ val snapshotDF1 = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*")
+ assertEquals(100, snapshotDF1.count())
+
+ val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
+ val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+ val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
+
+ // Upsert Operation
+ inputDF2.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+ assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").size())
+
+ // Snapshot Query
+ val snapshotDF2 = spark.read.format("org.apache.hudi")
+ .load(basePath + "/*/*/*/*")
+ assertEquals(100, snapshotDF2.count()) // still 100, since we only updated
+
+ // Read Incremental Query
+ // we have 2 commits, try pulling the first commit (which is not the latest)
+ val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").get(0)
+ val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
+ .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, firstCommit)
+ .load(basePath)
+ assertEquals(100, hoodieIncViewDF1.count()) // 100 initial inserts must be pulled
+ var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect()
+ assertEquals(1, countsPerCommit.length)
+ assertEquals(firstCommit, countsPerCommit(0).get(0))
+
+ // Upsert an empty dataFrame
+ val emptyRecords = recordsToStrings(dataGen.generateUpdates("002", 0)).toList
+ val emptyDF = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1))
+ emptyDF.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ // pull the latest commit
+ val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
+ .load(basePath)
+
+ assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled
+ countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect()
+ assertEquals(1, countsPerCommit.length)
+ assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
+
+ // pull the latest commit within certain partitions
+ val hoodieIncViewDF3 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
+ .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2016/*/*/*")
+ .load(basePath)
+ assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2016")).count(), hoodieIncViewDF3.count())
+
+ val timeTravelDF = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
+ .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, firstCommit)
+ .load(basePath)
+ assertEquals(100, timeTravelDF.count()) // 100 initial inserts must be pulled
+ }
+
+ @Test def testDropInsertDup(): Unit = {
+ val insert1Cnt = 10
+ val insert2DupKeyCnt = 9
+ val insert2NewKeyCnt = 2
+
+ val totalUniqueKeyToGenerate = insert1Cnt + insert2NewKeyCnt
+ val allRecords = dataGen.generateInserts("001", totalUniqueKeyToGenerate)
+ val inserts1 = allRecords.subList(0, insert1Cnt)
+ val inserts2New = dataGen.generateSameKeyInserts("002", allRecords.subList(insert1Cnt, insert1Cnt + insert2NewKeyCnt))
+ val inserts2Dup = dataGen.generateSameKeyInserts("002", inserts1.subList(0, insert2DupKeyCnt))
+
+ val records1 = recordsToStrings(inserts1).toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ inputDF1.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ val hoodieROViewDF1 = spark.read.format("org.apache.hudi")
+ .load(basePath + "/*/*/*/*")
+ assertEquals(insert1Cnt, hoodieROViewDF1.count())
+
+ val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+ val records2 = recordsToStrings(inserts2Dup ++ inserts2New).toList
+ val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+ inputDF2.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY, "true")
+ .mode(SaveMode.Append)
+ .save(basePath)
+ val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
+ .load(basePath + "/*/*/*/*")
+ assertEquals(hoodieROViewDF2.count(), totalUniqueKeyToGenerate)
+
+ val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
+ .load(basePath)
+ assertEquals(hoodieIncViewDF2.count(), insert2NewKeyCnt)
+ }
+}
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala
deleted file mode 100644
index 48582c1..0000000
--- a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala
+++ /dev/null
@@ -1,337 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.functional
-
-
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator
-import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
-import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.exception.TableNotFoundException
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
-import org.apache.log4j.LogManager
-import org.apache.spark.sql._
-import org.apache.spark.sql.functions.col
-import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.io.TempDir
-import org.junit.jupiter.api.{BeforeEach, Test}
-
-import scala.collection.JavaConversions._
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, Future}
-
-/**
- * Basic tests on the spark datasource
- */
-class TestDataSource {
- private val log = LogManager.getLogger(getClass)
-
- var spark: SparkSession = null
- var dataGen: HoodieTestDataGenerator = null
- val commonOpts = Map(
- "hoodie.insert.shuffle.parallelism" -> "4",
- "hoodie.upsert.shuffle.parallelism" -> "4",
- DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
- DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
- DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
- HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
- )
- var basePath: String = null
- var fs: FileSystem = null
-
- @BeforeEach def initialize(@TempDir tempDir: java.nio.file.Path) {
- spark = SparkSession.builder
- .appName("Hoodie Datasource test")
- .master("local[2]")
- .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .getOrCreate
- dataGen = new HoodieTestDataGenerator()
- basePath = tempDir.toAbsolutePath.toString
- fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
- }
-
- @Test def testShortNameStorage() {
- // Insert Operation
- val records = recordsToStrings(dataGen.generateInserts("000", 100)).toList
- val inputDF: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records, 2))
- inputDF.write.format("hudi")
- .options(commonOpts)
- .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
- .mode(SaveMode.Overwrite)
- .save(basePath)
-
- assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
- }
-
- @Test def testCopyOnWriteStorage() {
- // Insert Operation
- val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
- val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
- inputDF1.write.format("org.apache.hudi")
- .options(commonOpts)
- .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
- .mode(SaveMode.Overwrite)
- .save(basePath)
-
- assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
- val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
-
- // Read RO View
- val hoodieROViewDF1 = spark.read.format("org.apache.hudi")
- .load(basePath + "/*/*/*/*");
- assertEquals(100, hoodieROViewDF1.count())
-
- val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
- val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
- val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
-
- // Upsert Operation
- inputDF2.write.format("org.apache.hudi")
- .options(commonOpts)
- .mode(SaveMode.Append)
- .save(basePath)
-
- val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, basePath)
- assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").size())
-
- // Read RO View
- val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
- .load(basePath + "/*/*/*/*");
- assertEquals(100, hoodieROViewDF2.count()) // still 100, since we only updated
-
- // Read Incremental View
- // we have 2 commits, try pulling the first commit (which is not the latest)
- val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").get(0);
- val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
- .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
- .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, firstCommit)
- .load(basePath);
- assertEquals(100, hoodieIncViewDF1.count()) // 100 initial inserts must be pulled
- var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect();
- assertEquals(1, countsPerCommit.length)
- assertEquals(firstCommit, countsPerCommit(0).get(0))
-
- // Upsert an empty dataFrame
- val emptyRecords = recordsToStrings(dataGen.generateUpdates("002", 0)).toList
- val emptyDF: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(emptyRecords, 1))
- emptyDF.write.format("org.apache.hudi")
- .options(commonOpts)
- .mode(SaveMode.Append)
- .save(basePath)
-
- // pull the latest commit
- val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
- .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
- .load(basePath);
-
- assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled
- countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect();
- assertEquals(1, countsPerCommit.length)
- assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
-
- // pull the latest commit within certain partitions
- val hoodieIncViewDF3 = spark.read.format("org.apache.hudi")
- .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
- .option(DataSourceReadOptions.INCR_PATH_GLOB_OPT_KEY, "/2016/*/*/*")
- .load(basePath);
- assertEquals(hoodieIncViewDF2.filter(col("_hoodie_partition_path").contains("2016")).count(), hoodieIncViewDF3.count())
- }
-
- @Test def testMergeOnReadStorage() {
- // Bulk Insert Operation
- val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
- val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
- inputDF1.write.format("org.apache.hudi")
- .options(commonOpts)
- .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
- .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
- .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
- .mode(SaveMode.Overwrite)
- .save(basePath)
-
- assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
-
- // Read RO View
- val hoodieROViewDF1 = spark.read.format("org.apache.hudi").load(basePath + "/*/*/*/*")
- assertEquals(100, hoodieROViewDF1.count()) // still 100, since we only updated
- }
-
- @Test def testDropInsertDup(): Unit = {
- val insert1Cnt = 10
- val insert2DupKeyCnt = 9
- val insert2NewKeyCnt = 2
-
- val totalUniqueKeyToGenerate = insert1Cnt + insert2NewKeyCnt
- val allRecords = dataGen.generateInserts("001", totalUniqueKeyToGenerate)
- val inserts1 = allRecords.subList(0, insert1Cnt)
- val inserts2New = dataGen.generateSameKeyInserts("002", allRecords.subList(insert1Cnt, insert1Cnt + insert2NewKeyCnt))
- val inserts2Dup = dataGen.generateSameKeyInserts("002", inserts1.subList(0, insert2DupKeyCnt))
-
- val records1 = recordsToStrings(inserts1).toList
- val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
- inputDF1.write.format("org.apache.hudi")
- .options(commonOpts)
- .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
- .mode(SaveMode.Overwrite)
- .save(basePath)
- val hoodieROViewDF1 = spark.read.format("org.apache.hudi")
- .load(basePath + "/*/*/*/*")
- assertEquals(insert1Cnt, hoodieROViewDF1.count())
-
- val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, basePath)
- val records2 = recordsToStrings(inserts2Dup ++ inserts2New).toList
- val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
- inputDF2.write.format("org.apache.hudi")
- .options(commonOpts)
- .option(DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY, "true")
- .mode(SaveMode.Append)
- .save(basePath)
- val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
- .load(basePath + "/*/*/*/*")
- assertEquals(hoodieROViewDF2.count(), totalUniqueKeyToGenerate)
-
- val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
- .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
- .load(basePath)
- assertEquals(hoodieIncViewDF2.count(), insert2NewKeyCnt)
- }
-
- @Test
- def testStructuredStreaming(): Unit = {
- fs.delete(new Path(basePath), true)
- val sourcePath = basePath + "/source"
- val destPath = basePath + "/dest"
- fs.mkdirs(new Path(sourcePath))
-
- // First chunk of data
- val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
- val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
-
- // Second chunk of data
- val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
- val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
- val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
-
- // define the source of streaming
- val streamingInput =
- spark.readStream
- .schema(inputDF1.schema)
- .json(sourcePath)
-
- val f1 = Future {
- println("streaming starting")
- //'writeStream' can be called only on streaming Dataset/DataFrame
- streamingInput
- .writeStream
- .format("org.apache.hudi")
- .options(commonOpts)
- .trigger(new ProcessingTime(100))
- .option("checkpointLocation", basePath + "/checkpoint")
- .outputMode(OutputMode.Append)
- .start(destPath)
- .awaitTermination(10000)
- println("streaming ends")
- }
-
- val f2 = Future {
- inputDF1.write.mode(SaveMode.Append).json(sourcePath)
- // wait for spark streaming to process one microbatch
- val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5);
- assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000"))
- val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, destPath)
- // Read RO View
- val hoodieROViewDF1 = spark.read.format("org.apache.hudi")
- .load(destPath + "/*/*/*/*")
- assert(hoodieROViewDF1.count() == 100)
-
- inputDF2.write.mode(SaveMode.Append).json(sourcePath)
- // wait for spark streaming to process one microbatch
- waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5);
- val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, destPath)
- assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
- // Read RO View
- val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
- .load(destPath + "/*/*/*/*")
- assertEquals(100, hoodieROViewDF2.count()) // still 100, since we only updated
-
-
- // Read Incremental View
- // we have 2 commits, try pulling the first commit (which is not the latest)
- val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").get(0)
- val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
- .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
- .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, firstCommit)
- .load(destPath)
- assertEquals(100, hoodieIncViewDF1.count())
- // 100 initial inserts must be pulled
- var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect()
- assertEquals(1, countsPerCommit.length)
- assertEquals(firstCommit, countsPerCommit(0).get(0))
-
- // pull the latest commit
- val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
- .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
- .load(destPath)
-
- assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled
- countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect()
- assertEquals(1, countsPerCommit.length)
- assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
- }
- Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
- }
-
- @throws[InterruptedException]
- private def waitTillAtleastNCommits(fs: FileSystem, tablePath: String,
- numCommits: Int, timeoutSecs: Int, sleepSecsAfterEachRun: Int): Int = {
- val beginTime = System.currentTimeMillis
- var currTime = beginTime
- val timeoutMsecs = timeoutSecs * 1000
- var numInstants = 0
- var success: Boolean = false
- while ({!success && (currTime - beginTime) < timeoutMsecs}) try {
- val timeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, tablePath)
- log.info("Timeline :" + timeline.getInstants.toArray)
- if (timeline.countInstants >= numCommits) {
- numInstants = timeline.countInstants
- success = true
- }
- val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath, true)
- } catch {
- case te: TableNotFoundException =>
- log.info("Got table not found exception. Retrying")
- } finally {
- Thread.sleep(sleepSecsAfterEachRun * 1000)
- currTime = System.currentTimeMillis
- }
- if (!success) {
- throw new IllegalStateException("Timed-out waiting for " + numCommits + " commits to appear in " + tablePath)
- }
- numInstants
- }
-}
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
new file mode 100644
index 0000000..5938ee5
--- /dev/null
+++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.log4j.LogManager
+import org.apache.spark.sql._
+import org.apache.spark.sql.functions._
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.collection.JavaConversions._
+
+/**
+ * Tests on Spark DataSource for MOR table.
+ */
+class TestMORDataSource extends HoodieClientTestBase {
+
+ var spark: SparkSession = null
+ private val log = LogManager.getLogger(classOf[TestMORDataSource])
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
+ DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
+ HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
+ )
+
+ @BeforeEach override def setUp() {
+ initPath()
+ initSparkContexts()
+ spark = sqlContext.sparkSession
+ initTestDataGenerator()
+ initFileSystem()
+ }
+
+ @AfterEach override def tearDown() = {
+ cleanupSparkContexts()
+ cleanupTestDataGenerator()
+ cleanupFileSystem()
+ }
+
+ @Test def testMergeOnReadStorage() {
+
+ val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
+ // Bulk Insert Operation
+ val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
+ val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ inputDF1.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+
+ // Read RO View
+ val hudiRODF1 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+ .load(basePath + "/*/*/*/*")
+ assertEquals(100, hudiRODF1.count()) // still 100, since we only updated
+ val insertCommitTime = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+ val insertCommitTimes = hudiRODF1.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList
+ assertEquals(List(insertCommitTime), insertCommitTimes)
+
+ // Upsert operation
+ val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList
+ val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+ inputDF2.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+
+ // Read Snapshot query
+ val updateCommitTime = HoodieDataSourceHelpers.latestCommit(fs, basePath)
+ val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ .load(basePath + "/*/*/*/*")
+ val updateCommitTimes = hudiSnapshotDF2.select("_hoodie_commit_time").distinct().collectAsList().map(r => r.getString(0)).toList
+ assertEquals(List(updateCommitTime), updateCommitTimes)
+ }
+
+ @Test def testCount() {
+ // First Operation:
+ // Producing parquet files to three default partitions.
+ // SNAPSHOT view on MOR table with parquet files only.
+ val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ inputDF1.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+ val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ .load(basePath + "/*/*/*/*")
+ assertEquals(100, hudiSnapshotDF1.count()) // still 100, since we only updated
+
+ // Second Operation:
+ // Upsert the update to the default partitions with duplicate records. Produced a log file for each parquet.
+ // SNAPSHOT view should read the log files only with the latest commit time.
+ val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList
+ val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+ inputDF2.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ .load(basePath + "/*/*/*/*")
+ assertEquals(100, hudiSnapshotDF2.count()) // still 100, since we only updated
+ val commit1Time = hudiSnapshotDF1.select("_hoodie_commit_time").head().get(0).toString
+ val commit2Time = hudiSnapshotDF2.select("_hoodie_commit_time").head().get(0).toString
+ assertEquals(hudiSnapshotDF2.select("_hoodie_commit_time").distinct().count(), 1)
+ assertTrue(commit2Time > commit1Time)
+ assertEquals(100, hudiSnapshotDF2.join(hudiSnapshotDF1, Seq("_hoodie_record_key"), "left").count())
+
+ // Unmerge
+ val hudiSnapshotSkipMergeDF2 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ .option(DataSourceReadOptions.REALTIME_MERGE_OPT_KEY, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL)
+ .load(basePath + "/*/*/*/*")
+ assertEquals(200, hudiSnapshotSkipMergeDF2.count())
+ assertEquals(100, hudiSnapshotSkipMergeDF2.select("_hoodie_record_key").distinct().count())
+ assertEquals(200, hudiSnapshotSkipMergeDF2.join(hudiSnapshotDF2, Seq("_hoodie_record_key"), "left").count())
+
+ // Test Read Optimized Query on MOR table
+ val hudiRODF2 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL)
+ .load(basePath + "/*/*/*/*")
+ assertEquals(100, hudiRODF2.count())
+
+ // Third Operation:
+ // Upsert another update to the default partitions with 50 duplicate records. Produced the second log file for each parquet.
+ // SNAPSHOT view should read the latest log files.
+ val records3 = recordsToStrings(dataGen.generateUniqueUpdates("003", 50)).toList
+ val inputDF3: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records3, 2))
+ inputDF3.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ val hudiSnapshotDF3 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ .load(basePath + "/*/*/*/*")
+ // still 100, because we only updated the existing records
+ assertEquals(100, hudiSnapshotDF3.count())
+
+ // 50 from commit2, 50 from commit3
+ assertEquals(hudiSnapshotDF3.select("_hoodie_commit_time").distinct().count(), 2)
+ assertEquals(50, hudiSnapshotDF3.filter(col("_hoodie_commit_time") > commit2Time).count())
+ assertEquals(50,
+ hudiSnapshotDF3.join(hudiSnapshotDF2, Seq("_hoodie_record_key", "_hoodie_commit_time"), "inner").count())
+
+ // Fourth Operation:
+ // Insert records to a new partition. Produced a new parquet file.
+ // SNAPSHOT view should read the latest log files from the default partition and parquet from the new partition.
+ val partitionPaths = new Array[String](1)
+ partitionPaths.update(0, "2020/01/10")
+ val newDataGen = new HoodieTestDataGenerator(partitionPaths)
+ val records4 = recordsToStrings(newDataGen.generateInserts("004", 100)).toList
+ val inputDF4: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records4, 2))
+ inputDF4.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ val hudiSnapshotDF4 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ .load(basePath + "/*/*/*/*")
+ // 200, because we insert 100 records to a new partition
+ assertEquals(200, hudiSnapshotDF4.count())
+ assertEquals(100,
+ hudiSnapshotDF1.join(hudiSnapshotDF4, Seq("_hoodie_record_key"), "inner").count())
+
+ // Fifth Operation:
+ // Upsert records to the new partition. Produced a newer version of parquet file.
+ // SNAPSHOT view should read the latest log files from the default partition
+ // and the latest parquet from the new partition.
+ val records5 = recordsToStrings(newDataGen.generateUpdates("005", 100)).toList
+ val inputDF5: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records5, 2))
+ inputDF5.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ val hudiSnapshotDF5 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ .load(basePath + "/*/*/*/*")
+ assertEquals(200, hudiSnapshotDF5.count())
+ }
+
+ @Test
+ def testPayloadDelete() {
+ // First Operation:
+ // Producing parquet files to three default partitions.
+ // SNAPSHOT view on MOR table with parquet files only.
+ val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ inputDF1.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
+ val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ .load(basePath + "/*/*/*/*")
+ assertEquals(100, hudiSnapshotDF1.count()) // still 100, since we only updated
+
+ // Second Operation:
+ // Upsert 50 delete records
+ // Snopshot view should only read 50 records
+ val records2 = recordsToStrings(dataGen.generateUniqueDeleteRecords("002", 50)).toList
+ val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+ inputDF2.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ .load(basePath + "/*/*/*/*")
+ assertEquals(50, hudiSnapshotDF2.count()) // 50 records were deleted
+ assertEquals(hudiSnapshotDF2.select("_hoodie_commit_time").distinct().count(), 1)
+ val commit1Time = hudiSnapshotDF1.select("_hoodie_commit_time").head().get(0).toString
+ val commit2Time = hudiSnapshotDF2.select("_hoodie_commit_time").head().get(0).toString
+ assertTrue(commit1Time.equals(commit2Time))
+ assertEquals(50, hudiSnapshotDF2.join(hudiSnapshotDF1, Seq("_hoodie_record_key"), "left").count())
+
+ // unmerge query, skip the delete records
+ val hudiSnapshotDF2Unmerge = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ .option(DataSourceReadOptions.REALTIME_MERGE_OPT_KEY, DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL)
+ .load(basePath + "/*/*/*/*")
+ assertEquals(100, hudiSnapshotDF2Unmerge.count())
+
+ // Third Operation:
+ // Upsert 50 delete records to delete the reset
+ // Snopshot view should read 0 record
+ val records3 = recordsToStrings(dataGen.generateUniqueDeleteRecords("003", 50)).toList
+ val inputDF3: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records3, 2))
+ inputDF3.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ val hudiSnapshotDF3 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ .load(basePath + "/*/*/*/*")
+ assertEquals(0, hudiSnapshotDF3.count()) // 100 records were deleted, 0 record to load
+ }
+
+ @Test
+ def testPrunedFiltered() {
+ // First Operation:
+ // Producing parquet files to three default partitions.
+ // SNAPSHOT view on MOR table with parquet files only.
+ val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ inputDF1.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ .load(basePath + "/*/*/*/*")
+ assertEquals(100, hudiSnapshotDF1.count())
+ // select nested columns with order different from the actual schema
+ assertEquals("amount,currency,tip_history,_hoodie_commit_seqno",
+ hudiSnapshotDF1
+ .select("fare.amount", "fare.currency", "tip_history", "_hoodie_commit_seqno")
+ .orderBy(desc("_hoodie_commit_seqno"))
+ .columns.mkString(","))
+
+ // Second Operation:
+ // Upsert 50 update records
+ // Snopshot view should read 100 records
+ val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 50))
+ .toList
+ val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+ inputDF2.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ .load(basePath + "/*/*/*/*")
+
+ val commit1Time = hudiSnapshotDF1.select("_hoodie_commit_time").head().get(0).toString
+
+ // filter first commit and only read log records
+ assertEquals(50, hudiSnapshotDF2.select("_hoodie_commit_seqno", "fare.amount", "fare.currency", "tip_history")
+ .filter(col("_hoodie_commit_time") > commit1Time).count())
+
+ // select nested columns with order different from the actual schema
+ assertEquals("amount,currency,tip_history,_hoodie_commit_seqno",
+ hudiSnapshotDF2
+ .select("fare.amount", "fare.currency", "tip_history", "_hoodie_commit_seqno")
+ .orderBy(desc("_hoodie_commit_seqno"))
+ .columns.mkString(","))
+
+ // Correctly loading type
+ val sampleRow = hudiSnapshotDF2
+ .select("begin_lat", "current_date", "fare.currency", "tip_history", "nation")
+ .orderBy(desc("_hoodie_commit_time"))
+ .head()
+ assertEquals(sampleRow.getDouble(0), sampleRow.get(0))
+ assertEquals(sampleRow.getLong(1), sampleRow.get(1))
+ assertEquals(sampleRow.getString(2), sampleRow.get(2))
+ assertEquals(sampleRow.getSeq(3), sampleRow.get(3))
+ assertEquals(sampleRow.getStruct(4), sampleRow.get(4))
+
+ // make sure show() work
+ hudiSnapshotDF1.show(1)
+ hudiSnapshotDF2.show(1)
+ }
+
+ @Test
+ def testVectorizedReader() {
+ spark.conf.set("spark.sql.parquet.enableVectorizedReader", true)
+ assertTrue(spark.conf.get("spark.sql.parquet.enableVectorizedReader").toBoolean)
+ // Vectorized Reader will only be triggered with AtomicType schema,
+ // which is not null, UDTs, arrays, structs, and maps.
+ val schema = HoodieTestDataGenerator.SHORT_TRIP_SCHEMA
+ val records1 = recordsToStrings(dataGen.generateInsertsAsPerSchema("001", 100, schema)).toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+ inputDF1.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
+ .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
+ .mode(SaveMode.Overwrite)
+ .save(basePath)
+ val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ .load(basePath + "/*/*/*/*")
+ assertEquals(100, hudiSnapshotDF1.count())
+
+ val records2 = recordsToStrings(dataGen.generateUniqueUpdatesAsPerSchema("002", 50, schema))
+ .toList
+ val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+ inputDF2.write.format("org.apache.hudi")
+ .options(commonOpts)
+ .mode(SaveMode.Append)
+ .save(basePath)
+ val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
+ .load(basePath + "/*/*/*/*")
+ assertEquals(100, hudiSnapshotDF2.count())
+
+ // loading correct type
+ val sampleRow = hudiSnapshotDF2
+ .select("fare", "driver", "_hoodie_is_deleted")
+ .head()
+ assertEquals(sampleRow.getDouble(0), sampleRow.get(0))
+ assertEquals(sampleRow.getString(1), sampleRow.get(1))
+ assertEquals(sampleRow.getBoolean(2), sampleRow.get(2))
+
+ // test show()
+ hudiSnapshotDF1.show(1)
+ hudiSnapshotDF2.show(1)
+ }
+}
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
new file mode 100644
index 0000000..011573b
--- /dev/null
+++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestStructuredStreaming.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.TableNotFoundException
+import org.apache.hudi.testutils.HoodieClientTestBase
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
+import org.apache.log4j.LogManager
+import org.apache.spark.sql._
+import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.collection.JavaConversions._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future}
+
+/**
+ * Basic tests on the spark datasource for structured streaming sink
+ */
+class TestStructuredStreaming extends HoodieClientTestBase {
+ private val log = LogManager.getLogger(getClass)
+ var spark: SparkSession = null
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "_row_key",
+ DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "partition",
+ DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "timestamp",
+ HoodieWriteConfig.TABLE_NAME -> "hoodie_test"
+ )
+
+ @BeforeEach override def setUp() {
+ initPath()
+ initSparkContexts()
+ spark = sqlContext.sparkSession
+ initTestDataGenerator()
+ initFileSystem()
+ }
+
+ @AfterEach override def tearDown() = {
+ cleanupSparkContexts()
+ cleanupTestDataGenerator()
+ cleanupFileSystem()
+ }
+
+ @Test
+ def testStructuredStreaming(): Unit = {
+ fs.delete(new Path(basePath), true)
+ val sourcePath = basePath + "/source"
+ val destPath = basePath + "/dest"
+ fs.mkdirs(new Path(sourcePath))
+
+ // First chunk of data
+ val records1 = recordsToStrings(dataGen.generateInserts("000", 100)).toList
+ val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+
+ // Second chunk of data
+ val records2 = recordsToStrings(dataGen.generateUpdates("001", 100)).toList
+ val inputDF2 = spark.read.json(spark.sparkContext.parallelize(records2, 2))
+ val uniqueKeyCnt = inputDF2.select("_row_key").distinct().count()
+
+ // define the source of streaming
+ val streamingInput =
+ spark.readStream
+ .schema(inputDF1.schema)
+ .json(sourcePath)
+
+ val f1 = Future {
+ println("streaming starting")
+ //'writeStream' can be called only on streaming Dataset/DataFrame
+ streamingInput
+ .writeStream
+ .format("org.apache.hudi")
+ .options(commonOpts)
+ .trigger(new ProcessingTime(100))
+ .option("checkpointLocation", basePath + "/checkpoint")
+ .outputMode(OutputMode.Append)
+ .start(destPath)
+ .awaitTermination(10000)
+ println("streaming ends")
+ }
+
+ val f2 = Future {
+ inputDF1.write.mode(SaveMode.Append).json(sourcePath)
+ // wait for spark streaming to process one microbatch
+ val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5)
+ assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000"))
+ val commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, destPath)
+ // Read RO View
+ val hoodieROViewDF1 = spark.read.format("org.apache.hudi")
+ .load(destPath + "/*/*/*/*")
+ assert(hoodieROViewDF1.count() == 100)
+
+ inputDF2.write.mode(SaveMode.Append).json(sourcePath)
+ // wait for spark streaming to process one microbatch
+ waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5)
+ val commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, destPath)
+ assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
+ // Read RO View
+ val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
+ .load(destPath + "/*/*/*/*")
+ assertEquals(100, hoodieROViewDF2.count()) // still 100, since we only updated
+
+
+ // Read Incremental View
+ // we have 2 commits, try pulling the first commit (which is not the latest)
+ val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").get(0)
+ val hoodieIncViewDF1 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, "000")
+ .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, firstCommit)
+ .load(destPath)
+ assertEquals(100, hoodieIncViewDF1.count())
+ // 100 initial inserts must be pulled
+ var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect()
+ assertEquals(1, countsPerCommit.length)
+ assertEquals(firstCommit, countsPerCommit(0).get(0))
+
+ // pull the latest commit
+ val hoodieIncViewDF2 = spark.read.format("org.apache.hudi")
+ .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, commitInstantTime1)
+ .load(destPath)
+
+ assertEquals(uniqueKeyCnt, hoodieIncViewDF2.count()) // 100 records must be pulled
+ countsPerCommit = hoodieIncViewDF2.groupBy("_hoodie_commit_time").count().collect()
+ assertEquals(1, countsPerCommit.length)
+ assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
+ }
+ Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
+ }
+
+ @throws[InterruptedException]
+ private def waitTillAtleastNCommits(fs: FileSystem, tablePath: String,
+ numCommits: Int, timeoutSecs: Int, sleepSecsAfterEachRun: Int) = {
+ val beginTime = System.currentTimeMillis
+ var currTime = beginTime
+ val timeoutMsecs = timeoutSecs * 1000
+ var numInstants = 0
+ var success = false
+ while ({!success && (currTime - beginTime) < timeoutMsecs}) try {
+ val timeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, tablePath)
+ log.info("Timeline :" + timeline.getInstants.toArray)
+ if (timeline.countInstants >= numCommits) {
+ numInstants = timeline.countInstants
+ success = true
+ }
+ val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath, true)
+ } catch {
+ case te: TableNotFoundException =>
+ log.info("Got table not found exception. Retrying")
+ } finally {
+ Thread.sleep(sleepSecsAfterEachRun * 1000)
+ currTime = System.currentTimeMillis
+ }
+ if (!success) throw new IllegalStateException("Timed-out waiting for " + numCommits + " commits to appear in " + tablePath)
+ numInstants
+ }
+}