You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/06/22 23:02:49 UTC
[incubator-iceberg] branch master updated: Spark: Support
structured streaming writes (#228)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 57e9bd0 Spark: Support structured streaming writes (#228)
57e9bd0 is described below
commit 57e9bd09b0d3ff0d67d320f232946d150b877a41
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Sun Jun 23 00:02:44 2019 +0100
Spark: Support structured streaming writes (#228)
---
build.gradle | 2 +-
.../apache/iceberg/spark/source/IcebergSource.java | 72 ++++---
.../iceberg/spark/source/StreamingWriter.java | 114 ++++++++++
.../org/apache/iceberg/spark/source/Writer.java | 6 +-
.../spark/source/TestStructuredStreaming.java | 235 +++++++++++++++++++++
5 files changed, 401 insertions(+), 28 deletions(-)
diff --git a/build.gradle b/build.gradle
index 3ebdae7..2d3204b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -99,7 +99,7 @@ subprojects {
testCompile 'junit:junit:4.12'
testCompile 'org.slf4j:slf4j-simple:1.7.5'
- testCompile 'org.mockito:mockito-all:1.10.19'
+ testCompile 'org.mockito:mockito-core:1.10.19'
}
publishing {
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index 7f41a75..be34947 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -33,19 +33,23 @@ import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.CheckCompatibility;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.streaming.StreamExecution;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.StreamWriteSupport;
import org.apache.spark.sql.sources.v2.WriteSupport;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
-public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister {
+public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister, StreamWriteSupport {
private SparkSession lazySpark = null;
private Configuration lazyConf = null;
@@ -65,37 +69,32 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
}
@Override
- public Optional<DataSourceWriter> createWriter(String jobId, StructType dfStruct, SaveMode mode,
- DataSourceOptions options) {
+ public Optional<DataSourceWriter> createWriter(String jobId, StructType dsStruct, SaveMode mode,
+ DataSourceOptions options) {
Preconditions.checkArgument(mode == SaveMode.Append, "Save mode %s is not supported", mode);
Configuration conf = new Configuration(lazyBaseConf());
Table table = getTableAndResolveHadoopConfiguration(options, conf);
-
- Schema dfSchema = SparkSchemaUtil.convert(table.schema(), dfStruct);
- List<String> errors = CheckCompatibility.writeCompatibilityErrors(table.schema(), dfSchema);
- if (!errors.isEmpty()) {
- StringBuilder sb = new StringBuilder();
- sb.append("Cannot write incompatible dataframe to table with schema:\n")
- .append(table.schema()).append("\nProblems:");
- for (String error : errors) {
- sb.append("\n* ").append(error);
- }
- throw new IllegalArgumentException(sb.toString());
- }
-
- Optional<String> formatOption = options.get("write-format");
- FileFormat format;
- if (formatOption.isPresent()) {
- format = FileFormat.valueOf(formatOption.get().toUpperCase(Locale.ENGLISH));
- } else {
- format = FileFormat.valueOf(table.properties()
- .getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT)
- .toUpperCase(Locale.ENGLISH));
- }
-
+ validateWriteSchema(table.schema(), dsStruct);
+ FileFormat format = getFileFormat(table.properties(), options);
return Optional.of(new Writer(table, format));
}
+ @Override
+ public StreamWriter createStreamWriter(String runId, StructType dsStruct,
+ OutputMode mode, DataSourceOptions options) {
+ Preconditions.checkArgument(
+ mode == OutputMode.Append() || mode == OutputMode.Complete(),
+ "Output mode %s is not supported", mode);
+ Configuration conf = new Configuration(lazyBaseConf());
+ Table table = getTableAndResolveHadoopConfiguration(options, conf);
+ validateWriteSchema(table.schema(), dsStruct);
+ FileFormat format = getFileFormat(table.properties(), options);
+ // Spark 2.4.x passes runId to createStreamWriter instead of real queryId,
+ // so we fetch it directly from sparkContext to make writes idempotent
+ String queryId = lazySparkSession().sparkContext().getLocalProperty(StreamExecution.QUERY_ID_KEY());
+ return new StreamingWriter(table, format, queryId, mode);
+ }
+
protected Table findTable(DataSourceOptions options, Configuration conf) {
Optional<String> location = options.get("path");
Preconditions.checkArgument(location.isPresent(),
@@ -138,4 +137,25 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
.filter(key -> key.startsWith("hadoop."))
.forEach(key -> baseConf.set(key.replaceFirst("hadoop.", ""), options.get(key)));
}
+
+ private FileFormat getFileFormat(Map<String, String> tableProperties, DataSourceOptions options) {
+ Optional<String> formatOption = options.get("write-format");
+ String format = formatOption.orElse(tableProperties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT));
+ return FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+ }
+
+ private void validateWriteSchema(Schema tableSchema, StructType dsStruct) {
+ Schema dsSchema = SparkSchemaUtil.convert(tableSchema, dsStruct);
+ List<String> errors = CheckCompatibility.writeCompatibilityErrors(tableSchema, dsSchema);
+ if (!errors.isEmpty()) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Cannot write incompatible dataset to table with schema:\n")
+ .append(tableSchema)
+ .append("\nProblems:");
+ for (String error : errors) {
+ sb.append("\n* ").append(error);
+ }
+ throw new IllegalArgumentException(sb.toString());
+ }
+ }
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java
new file mode 100644
index 0000000..d768025
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotUpdate;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamingWriter extends Writer implements StreamWriter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StreamingWriter.class);
+ private static final String QUERY_ID_PROPERTY = "spark.sql.streaming.queryId";
+ private static final String EPOCH_ID_PROPERTY = "spark.sql.streaming.epochId";
+
+ private final String queryId;
+ private final OutputMode mode;
+
+ StreamingWriter(Table table, FileFormat format, String queryId, OutputMode mode) {
+ super(table, format);
+ this.queryId = queryId;
+ this.mode = mode;
+ }
+
+ @Override
+ public void commit(long epochId, WriterCommitMessage[] messages) {
+ LOG.info("Committing epoch {} for query {} in {} mode", epochId, queryId, mode);
+
+ table().refresh();
+ Long lastCommittedEpochId = getLastCommittedEpochId();
+ if (lastCommittedEpochId != null && epochId <= lastCommittedEpochId) {
+ LOG.info("Skipping epoch {} for query {} as it was already committed", epochId, queryId);
+ return;
+ }
+
+ if (mode == OutputMode.Complete()) {
+ OverwriteFiles overwriteFiles = table().newOverwrite();
+ overwriteFiles.overwriteByRowFilter(Expressions.alwaysTrue());
+ int numFiles = 0;
+ for (DataFile file : files(messages)) {
+ overwriteFiles.addFile(file);
+ numFiles++;
+ }
+ LOG.info("Overwriting files in {} with {} new files", table(), numFiles);
+ commit(overwriteFiles, epochId);
+ } else {
+ AppendFiles append = table().newFastAppend();
+ int numFiles = 0;
+ for (DataFile file : files(messages)) {
+ append.appendFile(file);
+ numFiles++;
+ }
+ LOG.info("Appending {} files to {}", numFiles, table());
+ commit(append, epochId);
+ }
+ }
+
+ private <T> void commit(SnapshotUpdate<T> snapshotUpdate, long epochId) {
+ snapshotUpdate.set(QUERY_ID_PROPERTY, queryId);
+ snapshotUpdate.set(EPOCH_ID_PROPERTY, Long.toString(epochId));
+ long start = System.currentTimeMillis();
+ snapshotUpdate.commit();
+ long duration = System.currentTimeMillis() - start;
+ LOG.info("Committed in {} ms", duration);
+ }
+
+ @Override
+ public void abort(long epochId, WriterCommitMessage[] messages) {
+ abort(messages);
+ }
+
+ private Long getLastCommittedEpochId() {
+ Snapshot snapshot = table().currentSnapshot();
+ Long lastCommittedEpochId = null;
+ while (snapshot != null) {
+ Map<String, String> summary = snapshot.summary();
+ String snapshotQueryId = summary.get(QUERY_ID_PROPERTY);
+ if (queryId.equals(snapshotQueryId)) {
+ lastCommittedEpochId = Long.valueOf(summary.get(EPOCH_ID_PROPERTY));
+ break;
+ }
+ Long parentSnapshotId = snapshot.parentId();
+ snapshot = parentSnapshotId != null ? table().snapshot(parentSnapshotId) : null;
+ }
+ return lastCommittedEpochId;
+ }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index d5c8b3c..617f159 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -125,7 +125,11 @@ class Writer implements DataSourceWriter {
});
}
- private Iterable<DataFile> files(WriterCommitMessage[] messages) {
+ protected Table table() {
+ return table;
+ }
+
+ protected Iterable<DataFile> files(WriterCommitMessage[] messages) {
if (messages.length > 0) {
return concat(transform(Arrays.asList(messages), message -> message != null
? ImmutableList.copyOf(((TaskCommit) message).files())
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
new file mode 100644
index 0000000..ccd5904
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.streaming.MemoryStream;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import scala.collection.JavaConversions;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestStructuredStreaming {
+
+ private static final Configuration CONF = new Configuration();
+ private static final Schema SCHEMA = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get())
+ );
+ private static SparkSession spark = null;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
+
+ @BeforeClass
+ public static void startSpark() {
+ TestStructuredStreaming.spark = SparkSession.builder()
+ .master("local[2]")
+ .config("spark.sql.shuffle.partitions", 4)
+ .getOrCreate();
+ }
+
+ @AfterClass
+ public static void stopSpark() {
+ SparkSession currentSpark = TestStructuredStreaming.spark;
+ TestStructuredStreaming.spark = null;
+ currentSpark.stop();
+ }
+
+ @Test
+ public void testStreamingWriteAppendMode() throws IOException {
+ File parent = temp.newFolder("parquet");
+ File location = new File(parent, "test-table");
+ File checkpoint = new File(parent, "checkpoint");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> expected = Lists.newArrayList(
+ new SimpleRecord(1, "1"),
+ new SimpleRecord(2, "2"),
+ new SimpleRecord(3, "3"),
+ new SimpleRecord(4, "4")
+ );
+
+ MemoryStream<Integer> inputStream = new MemoryStream<>(1, spark.sqlContext(), Encoders.INT());
+ DataStreamWriter<Row> streamWriter = inputStream.toDF()
+ .selectExpr("value AS id", "CAST (value AS STRING) AS data")
+ .writeStream()
+ .outputMode("append")
+ .format("iceberg")
+ .option("checkpointLocation", checkpoint.toString())
+ .option("path", location.toString());
+
+ try {
+ // start the original query with checkpointing
+ StreamingQuery query = streamWriter.start();
+ List<Integer> batch1 = Lists.newArrayList(1, 2);
+ inputStream.addData(JavaConversions.asScalaBuffer(batch1));
+ query.processAllAvailable();
+ List<Integer> batch2 = Lists.newArrayList(3, 4);
+ inputStream.addData(JavaConversions.asScalaBuffer(batch2));
+ query.processAllAvailable();
+ query.stop();
+
+ // remove the last commit to force Spark to reprocess batch #1
+ File lastCommitFile = new File(checkpoint.toString() + "/commits/1");
+ Assert.assertTrue("The commit file must be deleted", lastCommitFile.delete());
+
+ // restart the query from the checkpoint
+ StreamingQuery restartedQuery = streamWriter.start();
+ restartedQuery.processAllAvailable();
+
+ // ensure the write was idempotent
+ Dataset<Row> result = spark.read()
+ .format("iceberg")
+ .load(location.toString());
+ List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
+ Assert.assertEquals("Result rows should match", expected, actual);
+ Assert.assertEquals("Number of snapshots should match", 2, Iterables.size(table.snapshots()));
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @Test
+ public void testStreamingWriteCompleteMode() throws IOException {
+ File parent = temp.newFolder("parquet");
+ File location = new File(parent, "test-table");
+ File checkpoint = new File(parent, "checkpoint");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ Table table = tables.create(SCHEMA, spec, location.toString());
+
+ List<SimpleRecord> expected = Lists.newArrayList(
+ new SimpleRecord(2, "1"),
+ new SimpleRecord(3, "2"),
+ new SimpleRecord(1, "3")
+ );
+
+ MemoryStream<Integer> inputStream = new MemoryStream<>(1, spark.sqlContext(), Encoders.INT());
+ DataStreamWriter<Row> streamWriter = inputStream.toDF()
+ .groupBy("value")
+ .count()
+ .selectExpr("CAST(count AS INT) AS id", "CAST (value AS STRING) AS data")
+ .writeStream()
+ .outputMode("complete")
+ .format("iceberg")
+ .option("checkpointLocation", checkpoint.toString())
+ .option("path", location.toString());
+
+ try {
+ // start the original query with checkpointing
+ StreamingQuery query = streamWriter.start();
+ List<Integer> batch1 = Lists.newArrayList(1, 2);
+ inputStream.addData(JavaConversions.asScalaBuffer(batch1));
+ query.processAllAvailable();
+ List<Integer> batch2 = Lists.newArrayList(1, 2, 2, 3);
+ inputStream.addData(JavaConversions.asScalaBuffer(batch2));
+ query.processAllAvailable();
+ query.stop();
+
+ // remove the last commit to force Spark to reprocess batch #1
+ File lastCommitFile = new File(checkpoint.toString() + "/commits/1");
+ Assert.assertTrue("The commit file must be deleted", lastCommitFile.delete());
+
+ // restart the query from the checkpoint
+ StreamingQuery restartedQuery = streamWriter.start();
+ restartedQuery.processAllAvailable();
+
+ // ensure the write was idempotent
+ Dataset<Row> result = spark.read()
+ .format("iceberg")
+ .load(location.toString());
+ List<SimpleRecord> actual = result.orderBy("data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+ Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
+ Assert.assertEquals("Result rows should match", expected, actual);
+ Assert.assertEquals("Number of snapshots should match", 2, Iterables.size(table.snapshots()));
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+
+ @Test
+ public void testStreamingWriteUpdateMode() throws IOException {
+ exceptionRule.expect(StreamingQueryException.class);
+ exceptionRule.expectMessage("Output mode Update is not supported");
+
+ File parent = temp.newFolder("parquet");
+ File location = new File(parent, "test-table");
+ File checkpoint = new File(parent, "checkpoint");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+ tables.create(SCHEMA, spec, location.toString());
+
+ MemoryStream<Integer> inputStream = new MemoryStream<>(1, spark.sqlContext(), Encoders.INT());
+ DataStreamWriter<Row> streamWriter = inputStream.toDF()
+ .selectExpr("value AS id", "CAST (value AS STRING) AS data")
+ .writeStream()
+ .outputMode("update")
+ .format("iceberg")
+ .option("checkpointLocation", checkpoint.toString())
+ .option("path", location.toString());
+
+ try {
+ StreamingQuery query = streamWriter.start();
+ List<Integer> batch1 = Lists.newArrayList(1, 2);
+ inputStream.addData(JavaConversions.asScalaBuffer(batch1));
+ query.processAllAvailable();
+ } finally {
+ for (StreamingQuery query : spark.streams().active()) {
+ query.stop();
+ }
+ }
+ }
+}