You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/06/22 23:02:49 UTC

[incubator-iceberg] branch master updated: Spark: Support structured streaming writes (#228)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 57e9bd0  Spark: Support structured streaming writes (#228)
57e9bd0 is described below

commit 57e9bd09b0d3ff0d67d320f232946d150b877a41
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Sun Jun 23 00:02:44 2019 +0100

    Spark: Support structured streaming writes (#228)
---
 build.gradle                                       |   2 +-
 .../apache/iceberg/spark/source/IcebergSource.java |  72 ++++---
 .../iceberg/spark/source/StreamingWriter.java      | 114 ++++++++++
 .../org/apache/iceberg/spark/source/Writer.java    |   6 +-
 .../spark/source/TestStructuredStreaming.java      | 235 +++++++++++++++++++++
 5 files changed, 401 insertions(+), 28 deletions(-)

diff --git a/build.gradle b/build.gradle
index 3ebdae7..2d3204b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -99,7 +99,7 @@ subprojects {
 
     testCompile 'junit:junit:4.12'
     testCompile 'org.slf4j:slf4j-simple:1.7.5'
-    testCompile 'org.mockito:mockito-all:1.10.19'
+    testCompile 'org.mockito:mockito-core:1.10.19'
   }
 
   publishing {
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index 7f41a75..be34947 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -33,19 +33,23 @@ import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.types.CheckCompatibility;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.streaming.StreamExecution;
 import org.apache.spark.sql.sources.DataSourceRegister;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.DataSourceV2;
 import org.apache.spark.sql.sources.v2.ReadSupport;
+import org.apache.spark.sql.sources.v2.StreamWriteSupport;
 import org.apache.spark.sql.sources.v2.WriteSupport;
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
 import org.apache.spark.sql.types.StructType;
 
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
 
-public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister {
+public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister, StreamWriteSupport {
 
   private SparkSession lazySpark = null;
   private Configuration lazyConf = null;
@@ -65,37 +69,32 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
   }
 
   @Override
-  public Optional<DataSourceWriter> createWriter(String jobId, StructType dfStruct, SaveMode mode,
-                                                   DataSourceOptions options) {
+  public Optional<DataSourceWriter> createWriter(String jobId, StructType dsStruct, SaveMode mode,
+                                                 DataSourceOptions options) {
     Preconditions.checkArgument(mode == SaveMode.Append, "Save mode %s is not supported", mode);
     Configuration conf = new Configuration(lazyBaseConf());
     Table table = getTableAndResolveHadoopConfiguration(options, conf);
-
-    Schema dfSchema = SparkSchemaUtil.convert(table.schema(), dfStruct);
-    List<String> errors = CheckCompatibility.writeCompatibilityErrors(table.schema(), dfSchema);
-    if (!errors.isEmpty()) {
-      StringBuilder sb = new StringBuilder();
-      sb.append("Cannot write incompatible dataframe to table with schema:\n")
-          .append(table.schema()).append("\nProblems:");
-      for (String error : errors) {
-        sb.append("\n* ").append(error);
-      }
-      throw new IllegalArgumentException(sb.toString());
-    }
-
-    Optional<String> formatOption = options.get("write-format");
-    FileFormat format;
-    if (formatOption.isPresent()) {
-      format = FileFormat.valueOf(formatOption.get().toUpperCase(Locale.ENGLISH));
-    } else {
-      format = FileFormat.valueOf(table.properties()
-          .getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT)
-          .toUpperCase(Locale.ENGLISH));
-    }
-
+    validateWriteSchema(table.schema(), dsStruct);
+    FileFormat format = getFileFormat(table.properties(), options);
     return Optional.of(new Writer(table, format));
   }
 
+  @Override
+  public StreamWriter createStreamWriter(String runId, StructType dsStruct,
+                                         OutputMode mode, DataSourceOptions options) {
+    Preconditions.checkArgument(
+        mode == OutputMode.Append() || mode == OutputMode.Complete(),
+        "Output mode %s is not supported", mode);
+    Configuration conf = new Configuration(lazyBaseConf());
+    Table table = getTableAndResolveHadoopConfiguration(options, conf);
+    validateWriteSchema(table.schema(), dsStruct);
+    FileFormat format = getFileFormat(table.properties(), options);
+    // Spark 2.4.x passes runId to createStreamWriter instead of real queryId,
+    // so we fetch it directly from sparkContext to make writes idempotent
+    String queryId = lazySparkSession().sparkContext().getLocalProperty(StreamExecution.QUERY_ID_KEY());
+    return new StreamingWriter(table, format, queryId, mode);
+  }
+
   protected Table findTable(DataSourceOptions options, Configuration conf) {
     Optional<String> location = options.get("path");
     Preconditions.checkArgument(location.isPresent(),
@@ -138,4 +137,25 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
         .filter(key -> key.startsWith("hadoop."))
         .forEach(key -> baseConf.set(key.replaceFirst("hadoop.", ""), options.get(key)));
   }
+
+  private FileFormat getFileFormat(Map<String, String> tableProperties, DataSourceOptions options) {
+    Optional<String> formatOption = options.get("write-format");
+    String format = formatOption.orElse(tableProperties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT));
+    return FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+  }
+
+  private void validateWriteSchema(Schema tableSchema, StructType dsStruct) {
+    Schema dsSchema = SparkSchemaUtil.convert(tableSchema, dsStruct);
+    List<String> errors = CheckCompatibility.writeCompatibilityErrors(tableSchema, dsSchema);
+    if (!errors.isEmpty()) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Cannot write incompatible dataset to table with schema:\n")
+          .append(tableSchema)
+          .append("\nProblems:");
+      for (String error : errors) {
+        sb.append("\n* ").append(error);
+      }
+      throw new IllegalArgumentException(sb.toString());
+    }
+  }
 }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java
new file mode 100644
index 0000000..d768025
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.Map;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotUpdate;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
+import org.apache.spark.sql.streaming.OutputMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StreamingWriter extends Writer implements StreamWriter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(StreamingWriter.class);
+  private static final String QUERY_ID_PROPERTY = "spark.sql.streaming.queryId";
+  private static final String EPOCH_ID_PROPERTY = "spark.sql.streaming.epochId";
+
+  private final String queryId;
+  private final OutputMode mode;
+
+  StreamingWriter(Table table, FileFormat format, String queryId, OutputMode mode) {
+    super(table, format);
+    this.queryId = queryId;
+    this.mode = mode;
+  }
+
+  @Override
+  public void commit(long epochId, WriterCommitMessage[] messages) {
+    LOG.info("Committing epoch {} for query {} in {} mode", epochId, queryId, mode);
+
+    table().refresh();
+    Long lastCommittedEpochId = getLastCommittedEpochId();
+    if (lastCommittedEpochId != null && epochId <= lastCommittedEpochId) {
+      LOG.info("Skipping epoch {} for query {} as it was already committed", epochId, queryId);
+      return;
+    }
+
+    if (mode == OutputMode.Complete()) {
+      OverwriteFiles overwriteFiles = table().newOverwrite();
+      overwriteFiles.overwriteByRowFilter(Expressions.alwaysTrue());
+      int numFiles = 0;
+      for (DataFile file : files(messages)) {
+        overwriteFiles.addFile(file);
+        numFiles++;
+      }
+      LOG.info("Overwriting files in {} with {} new files", table(), numFiles);
+      commit(overwriteFiles, epochId);
+    } else {
+      AppendFiles append = table().newFastAppend();
+      int numFiles = 0;
+      for (DataFile file : files(messages)) {
+        append.appendFile(file);
+        numFiles++;
+      }
+      LOG.info("Appending {} files to {}", numFiles, table());
+      commit(append, epochId);
+    }
+  }
+
+  private <T> void commit(SnapshotUpdate<T> snapshotUpdate, long epochId) {
+    snapshotUpdate.set(QUERY_ID_PROPERTY, queryId);
+    snapshotUpdate.set(EPOCH_ID_PROPERTY, Long.toString(epochId));
+    long start = System.currentTimeMillis();
+    snapshotUpdate.commit();
+    long duration = System.currentTimeMillis() - start;
+    LOG.info("Committed in {} ms", duration);
+  }
+
+  @Override
+  public void abort(long epochId, WriterCommitMessage[] messages) {
+    abort(messages);
+  }
+
+  private Long getLastCommittedEpochId() {
+    Snapshot snapshot = table().currentSnapshot();
+    Long lastCommittedEpochId = null;
+    while (snapshot != null) {
+      Map<String, String> summary = snapshot.summary();
+      String snapshotQueryId = summary.get(QUERY_ID_PROPERTY);
+      if (queryId.equals(snapshotQueryId)) {
+        lastCommittedEpochId = Long.valueOf(summary.get(EPOCH_ID_PROPERTY));
+        break;
+      }
+      Long parentSnapshotId = snapshot.parentId();
+      snapshot = parentSnapshotId != null ? table().snapshot(parentSnapshotId) : null;
+    }
+    return lastCommittedEpochId;
+  }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
index d5c8b3c..617f159 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -125,7 +125,11 @@ class Writer implements DataSourceWriter {
         });
   }
 
-  private Iterable<DataFile> files(WriterCommitMessage[] messages) {
+  protected Table table() {
+    return table;
+  }
+
+  protected Iterable<DataFile> files(WriterCommitMessage[] messages) {
     if (messages.length > 0) {
       return concat(transform(Arrays.asList(messages), message -> message != null
           ? ImmutableList.copyOf(((TaskCommit) message).files())
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
new file mode 100644
index 0000000..ccd5904
--- /dev/null
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.streaming.MemoryStream;
+import org.apache.spark.sql.streaming.DataStreamWriter;
+import org.apache.spark.sql.streaming.StreamingQuery;
+import org.apache.spark.sql.streaming.StreamingQueryException;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import scala.collection.JavaConversions;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+public class TestStructuredStreaming {
+
+  private static final Configuration CONF = new Configuration();
+  private static final Schema SCHEMA = new Schema(
+      optional(1, "id", Types.IntegerType.get()),
+      optional(2, "data", Types.StringType.get())
+  );
+  private static SparkSession spark = null;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  @Rule
+  public ExpectedException exceptionRule = ExpectedException.none();
+
+  @BeforeClass
+  public static void startSpark() {
+    TestStructuredStreaming.spark = SparkSession.builder()
+        .master("local[2]")
+        .config("spark.sql.shuffle.partitions", 4)
+        .getOrCreate();
+  }
+
+  @AfterClass
+  public static void stopSpark() {
+    SparkSession currentSpark = TestStructuredStreaming.spark;
+    TestStructuredStreaming.spark = null;
+    currentSpark.stop();
+  }
+
+  @Test
+  public void testStreamingWriteAppendMode() throws IOException {
+    File parent = temp.newFolder("parquet");
+    File location = new File(parent, "test-table");
+    File checkpoint = new File(parent, "checkpoint");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> expected = Lists.newArrayList(
+        new SimpleRecord(1, "1"),
+        new SimpleRecord(2, "2"),
+        new SimpleRecord(3, "3"),
+        new SimpleRecord(4, "4")
+    );
+
+    MemoryStream<Integer> inputStream = new MemoryStream<>(1, spark.sqlContext(), Encoders.INT());
+    DataStreamWriter<Row> streamWriter = inputStream.toDF()
+        .selectExpr("value AS id", "CAST (value AS STRING) AS data")
+        .writeStream()
+        .outputMode("append")
+        .format("iceberg")
+        .option("checkpointLocation", checkpoint.toString())
+        .option("path", location.toString());
+
+    try {
+      // start the original query with checkpointing
+      StreamingQuery query = streamWriter.start();
+      List<Integer> batch1 = Lists.newArrayList(1, 2);
+      inputStream.addData(JavaConversions.asScalaBuffer(batch1));
+      query.processAllAvailable();
+      List<Integer> batch2 = Lists.newArrayList(3, 4);
+      inputStream.addData(JavaConversions.asScalaBuffer(batch2));
+      query.processAllAvailable();
+      query.stop();
+
+      // remove the last commit to force Spark to reprocess batch #1
+      File lastCommitFile = new File(checkpoint.toString() + "/commits/1");
+      Assert.assertTrue("The commit file must be deleted", lastCommitFile.delete());
+
+      // restart the query from the checkpoint
+      StreamingQuery restartedQuery = streamWriter.start();
+      restartedQuery.processAllAvailable();
+
+      // ensure the write was idempotent
+      Dataset<Row> result = spark.read()
+          .format("iceberg")
+          .load(location.toString());
+      List<SimpleRecord> actual = result.orderBy("id").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+      Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
+      Assert.assertEquals("Result rows should match", expected, actual);
+      Assert.assertEquals("Number of snapshots should match", 2, Iterables.size(table.snapshots()));
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @Test
+  public void testStreamingWriteCompleteMode() throws IOException {
+    File parent = temp.newFolder("parquet");
+    File location = new File(parent, "test-table");
+    File checkpoint = new File(parent, "checkpoint");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    Table table = tables.create(SCHEMA, spec, location.toString());
+
+    List<SimpleRecord> expected = Lists.newArrayList(
+        new SimpleRecord(2, "1"),
+        new SimpleRecord(3, "2"),
+        new SimpleRecord(1, "3")
+    );
+
+    MemoryStream<Integer> inputStream = new MemoryStream<>(1, spark.sqlContext(), Encoders.INT());
+    DataStreamWriter<Row> streamWriter = inputStream.toDF()
+        .groupBy("value")
+        .count()
+        .selectExpr("CAST(count AS INT) AS id", "CAST (value AS STRING) AS data")
+        .writeStream()
+        .outputMode("complete")
+        .format("iceberg")
+        .option("checkpointLocation", checkpoint.toString())
+        .option("path", location.toString());
+
+    try {
+      // start the original query with checkpointing
+      StreamingQuery query = streamWriter.start();
+      List<Integer> batch1 = Lists.newArrayList(1, 2);
+      inputStream.addData(JavaConversions.asScalaBuffer(batch1));
+      query.processAllAvailable();
+      List<Integer> batch2 = Lists.newArrayList(1, 2, 2, 3);
+      inputStream.addData(JavaConversions.asScalaBuffer(batch2));
+      query.processAllAvailable();
+      query.stop();
+
+      // remove the last commit to force Spark to reprocess batch #1
+      File lastCommitFile = new File(checkpoint.toString() + "/commits/1");
+      Assert.assertTrue("The commit file must be deleted", lastCommitFile.delete());
+
+      // restart the query from the checkpoint
+      StreamingQuery restartedQuery = streamWriter.start();
+      restartedQuery.processAllAvailable();
+
+      // ensure the write was idempotent
+      Dataset<Row> result = spark.read()
+          .format("iceberg")
+          .load(location.toString());
+      List<SimpleRecord> actual = result.orderBy("data").as(Encoders.bean(SimpleRecord.class)).collectAsList();
+      Assert.assertEquals("Number of rows should match", expected.size(), actual.size());
+      Assert.assertEquals("Result rows should match", expected, actual);
+      Assert.assertEquals("Number of snapshots should match", 2, Iterables.size(table.snapshots()));
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+
+  @Test
+  public void testStreamingWriteUpdateMode() throws IOException {
+    exceptionRule.expect(StreamingQueryException.class);
+    exceptionRule.expectMessage("Output mode Update is not supported");
+
+    File parent = temp.newFolder("parquet");
+    File location = new File(parent, "test-table");
+    File checkpoint = new File(parent, "checkpoint");
+
+    HadoopTables tables = new HadoopTables(CONF);
+    PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
+    tables.create(SCHEMA, spec, location.toString());
+
+    MemoryStream<Integer> inputStream = new MemoryStream<>(1, spark.sqlContext(), Encoders.INT());
+    DataStreamWriter<Row> streamWriter = inputStream.toDF()
+        .selectExpr("value AS id", "CAST (value AS STRING) AS data")
+        .writeStream()
+        .outputMode("update")
+        .format("iceberg")
+        .option("checkpointLocation", checkpoint.toString())
+        .option("path", location.toString());
+
+    try {
+      StreamingQuery query = streamWriter.start();
+      List<Integer> batch1 = Lists.newArrayList(1, 2);
+      inputStream.addData(JavaConversions.asScalaBuffer(batch1));
+      query.processAllAvailable();
+    } finally {
+      for (StreamingQuery query : spark.streams().active()) {
+        query.stop();
+      }
+    }
+  }
+}