You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/06/20 01:02:33 UTC
[iceberg] branch master updated: Spark: Move classes that depend on
2.x DSv2 to spark2 (#1122)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 845de7b Spark: Move classes that depend on 2.x DSv2 to spark2 (#1122)
845de7b is described below
commit 845de7bfe78bc8ba3480aad219fcef9f71477023
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Fri Jun 19 18:02:23 2020 -0700
Spark: Move classes that depend on 2.x DSv2 to spark2 (#1122)
---
build.gradle | 13 +
.../iceberg/actions/RewriteDataFilesAction.java | 3 +-
.../iceberg/spark/source/BaseDataReader.java | 10 +-
.../apache/iceberg/spark/source/BaseWriter.java | 143 ++++++
.../iceberg/spark/source/OutputFileFactory.java | 78 +++
.../iceberg/spark/source/PartitionedWriter.java | 68 +++
.../iceberg/spark/source/RowDataRewriter.java | 59 ++-
.../iceberg/spark/source/SparkAppenderFactory.java | 86 ++++
.../spark/source/{Stats.java => TaskResult.java} | 33 +-
.../{Stats.java => UnpartitionedWriter.java} | 27 +-
.../org/apache/iceberg/spark/source/Writer.java | 544 ---------------------
.../org/apache/iceberg/spark/SparkFilters.java | 0
.../apache/iceberg/spark/source/IcebergSource.java | 0
.../org/apache/iceberg/spark/source/Reader.java | 49 +-
.../org/apache/iceberg/spark/source/Stats.java | 0
.../iceberg/spark/source/StreamingWriter.java | 0
.../org/apache/iceberg/spark/source/Writer.java | 282 +++++++++++
...org.apache.spark.sql.sources.DataSourceRegister | 0
.../org/apache/iceberg/spark/SparkTableUtil.scala | 0
.../actions/TestRemoveOrphanFilesAction.java | 0
.../actions/TestRewriteDataFilesAction.java | 0
.../actions/TestRewriteManifestsAction.java | 0
.../apache/iceberg/examples/ConcurrencyTest.java | 0
.../java/org/apache/iceberg/examples/README.md | 0
.../iceberg/examples/ReadAndWriteTablesTest.java | 0
.../iceberg/examples/SchemaEvolutionTest.java | 0
.../org/apache/iceberg/examples/SimpleRecord.java | 0
.../examples/SnapshotFunctionalityTest.java | 0
.../apache/iceberg/spark/TestSparkDataFile.java | 0
.../apache/iceberg/spark/source/LogMessage.java | 0
.../apache/iceberg/spark/source/SimpleRecord.java | 0
.../apache/iceberg/spark/source/TestAvroScan.java | 0
.../iceberg/spark/source/TestDataFrameWrites.java | 0
.../spark/source/TestDataSourceOptions.java | 0
.../iceberg/spark/source/TestFilteredScan.java | 0
.../spark/source/TestForwardCompatibility.java | 0
.../iceberg/spark/source/TestIcebergSource.java | 0
.../source/TestIcebergSourceHadoopTables.java | 0
.../spark/source/TestIcebergSourceHiveTables.java | 0
.../spark/source/TestIcebergSourceTablesBase.java | 0
.../spark/source/TestIdentityPartitionData.java | 0
.../iceberg/spark/source/TestParquetScan.java | 0
.../iceberg/spark/source/TestPartitionValues.java | 0
.../iceberg/spark/source/TestReadProjection.java | 0
.../spark/source/TestSnapshotSelection.java | 0
.../iceberg/spark/source/TestSparkDataWrite.java | 0
.../spark/source/TestSparkReadProjection.java | 0
.../iceberg/spark/source/TestSparkSchema.java | 0
.../iceberg/spark/source/TestSparkTableUtil.java | 0
.../TestSparkTableUtilWithInMemoryCatalog.java | 0
.../spark/source/TestStructuredStreaming.java | 0
.../apache/iceberg/spark/source/TestTables.java | 0
.../spark/source/TestWriteMetricsConfig.java | 0
.../src/test/resources/data/books.json | 0
.../src/test/resources/data/new-books.json | 0
55 files changed, 753 insertions(+), 642 deletions(-)
diff --git a/build.gradle b/build.gradle
index a9da1a7..5510405 100644
--- a/build.gradle
+++ b/build.gradle
@@ -519,6 +519,18 @@ project(':iceberg-spark2') {
testCompile project(path: ':iceberg-hive', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
}
+
+ test {
+ // For vectorized reads
+ // Allow unsafe memory access to avoid the costly check arrow does to check if index is within bounds
+ systemProperty("arrow.enable_unsafe_memory_access", "true")
+ // Disable expensive null check for every get(index) call.
+ // Iceberg manages nullability checks itself instead of relying on arrow.
+ systemProperty("arrow.enable_null_check_for_get", "false")
+
+ // Vectorized reads need more memory
+ maxHeapSize '2500m'
+ }
}
project(':iceberg-spark3') {
@@ -548,6 +560,7 @@ project(':iceberg-spark3') {
testCompile project(path: ':iceberg-hive', configuration: 'testArtifacts')
testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
}
+
test {
// For vectorized reads
// Allow unsafe memory access to avoid the costly check arrow does to check if index is within bounds
diff --git a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
index e1a7584..0c121a9 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
@@ -228,8 +228,7 @@ public class RewriteDataFilesAction
Broadcast<FileIO> io = sparkContext.broadcast(fileIO);
Broadcast<EncryptionManager> encryption = sparkContext.broadcast(encryptionManager);
- RowDataRewriter rowDataRewriter =
- new RowDataRewriter(table, spec, caseSensitive, io, encryption, targetSizeInBytes);
+ RowDataRewriter rowDataRewriter = new RowDataRewriter(table, spec, caseSensitive, io, encryption);
List<DataFile> addedDataFiles = rowDataRewriter.rewriteDataForTasks(taskRDD);
List<DataFile> currentDataFiles = filteredGroupedTasks.values().stream()
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
index 9ab8bce..93e03aa 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.spark.source;
+import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
@@ -33,15 +34,13 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.spark.rdd.InputFileBlockHolder;
-import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
/**
- * Base class of readers of type {@link InputPartitionReader} to read data as objects of type @param <T>
+ * Base class of Spark readers.
*
* @param <T> is the Java class returned by this reader whose objects contain one or more rows.
*/
-@SuppressWarnings("checkstyle:VisibilityModifier")
-abstract class BaseDataReader<T> implements InputPartitionReader<T> {
+abstract class BaseDataReader<T> implements Closeable {
private final Iterator<FileScanTask> tasks;
private final FileIO fileIo;
private final Map<String, InputFile> inputFiles;
@@ -64,7 +63,6 @@ abstract class BaseDataReader<T> implements InputPartitionReader<T> {
this.currentIterator = CloseableIterator.empty();
}
- @Override
public boolean next() throws IOException {
while (true) {
if (currentIterator.hasNext()) {
@@ -79,14 +77,12 @@ abstract class BaseDataReader<T> implements InputPartitionReader<T> {
}
}
- @Override
public T get() {
return current;
}
abstract CloseableIterator<T> open(FileScanTask task);
- @Override
public void close() throws IOException {
InputFileBlockHolder.unset();
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BaseWriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/BaseWriter.java
new file mode 100644
index 0000000..f3a1030
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BaseWriter.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.sql.catalyst.InternalRow;
+
+abstract class BaseWriter implements Closeable {
+ protected static final int ROWS_DIVISOR = 1000;
+
+ private final List<DataFile> completedFiles = Lists.newArrayList();
+ private final PartitionSpec spec;
+ private final FileFormat format;
+ private final SparkAppenderFactory appenderFactory;
+ private final OutputFileFactory fileFactory;
+ private final FileIO io;
+ private final long targetFileSize;
+ private PartitionKey currentKey = null;
+ private FileAppender<InternalRow> currentAppender = null;
+ private EncryptedOutputFile currentFile = null;
+ private long currentRows = 0;
+
+ BaseWriter(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
+ OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+ this.spec = spec;
+ this.format = format;
+ this.appenderFactory = appenderFactory;
+ this.fileFactory = fileFactory;
+ this.io = io;
+ this.targetFileSize = targetFileSize;
+ }
+
+ public abstract void write(InternalRow row) throws IOException;
+
+ public void writeInternal(InternalRow row) throws IOException {
+ //TODO: ORC file now not support target file size before closed
+ if (!format.equals(FileFormat.ORC) &&
+ currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >= targetFileSize) {
+ closeCurrent();
+ openCurrent();
+ }
+
+ currentAppender.add(row);
+ currentRows++;
+ }
+
+ public TaskResult complete() throws IOException {
+ closeCurrent();
+
+ return new TaskResult(completedFiles);
+ }
+
+ public void abort() throws IOException {
+ closeCurrent();
+
+ // clean up files created by this writer
+ Tasks.foreach(completedFiles)
+ .throwFailureWhenFinished()
+ .noRetry()
+ .run(file -> io.deleteFile(file.path().toString()));
+ }
+
+ @Override
+ public void close() throws IOException {
+ closeCurrent();
+ }
+
+ protected void openCurrent() {
+ if (spec.fields().size() == 0) {
+ // unpartitioned
+ currentFile = fileFactory.newOutputFile();
+ } else {
+ // partitioned
+ currentFile = fileFactory.newOutputFile(currentKey);
+ }
+ currentAppender = appenderFactory.newAppender(currentFile.encryptingOutputFile(), format);
+ currentRows = 0;
+ }
+
+ protected void closeCurrent() throws IOException {
+ if (currentAppender != null) {
+ currentAppender.close();
+ // metrics are only valid after the appender is closed
+ Metrics metrics = currentAppender.metrics();
+ long fileSizeInBytes = currentAppender.length();
+ List<Long> splitOffsets = currentAppender.splitOffsets();
+ this.currentAppender = null;
+
+ if (metrics.recordCount() == 0L) {
+ io.deleteFile(currentFile.encryptingOutputFile());
+ } else {
+ DataFile dataFile = DataFiles.builder(spec)
+ .withEncryptionKeyMetadata(currentFile.keyMetadata())
+ .withPath(currentFile.encryptingOutputFile().location())
+ .withFileSizeInBytes(fileSizeInBytes)
+ .withPartition(spec.fields().size() == 0 ? null : currentKey) // set null if unpartitioned
+ .withMetrics(metrics)
+ .withSplitOffsets(splitOffsets)
+ .build();
+ completedFiles.add(dataFile);
+ }
+
+ this.currentFile = null;
+ }
+ }
+
+ protected PartitionKey getCurrentKey() {
+ return currentKey;
+ }
+
+ protected void setCurrentKey(PartitionKey currentKey) {
+ this.currentKey = currentKey;
+ }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/OutputFileFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/OutputFileFactory.java
new file mode 100644
index 0000000..1daf889
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/OutputFileFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.io.OutputFile;
+
+class OutputFileFactory {
+ private final PartitionSpec spec;
+ private final FileFormat format;
+ private final LocationProvider locations;
+ private final FileIO io;
+ private final EncryptionManager encryptionManager;
+ private final int partitionId;
+ private final long taskId;
+ // The purpose of this uuid is to be able to know from two paths that they were written by the same operation.
+ // That's useful, for example, if a Spark job dies and leaves files in the file system, you can identify them all
+ // with a recursive listing and grep.
+ private final String uuid = UUID.randomUUID().toString();
+ private final AtomicInteger fileCount = new AtomicInteger(0);
+
+ OutputFileFactory(PartitionSpec spec, FileFormat format, LocationProvider locations, FileIO io,
+ EncryptionManager encryptionManager, int partitionId, long taskId) {
+ this.spec = spec;
+ this.format = format;
+ this.locations = locations;
+ this.io = io;
+ this.encryptionManager = encryptionManager;
+ this.partitionId = partitionId;
+ this.taskId = taskId;
+ }
+
+ private String generateFilename() {
+ return format.addExtension(
+ String.format("%05d-%d-%s-%05d", partitionId, taskId, uuid, fileCount.incrementAndGet()));
+ }
+
+ /**
+ * Generates EncryptedOutputFile for UnpartitionedWriter.
+ */
+ public EncryptedOutputFile newOutputFile() {
+ OutputFile file = io.newOutputFile(locations.newDataLocation(generateFilename()));
+ return encryptionManager.encrypt(file);
+ }
+
+ /**
+ * Generates EncryptedOutputFile for PartitionedWriter.
+ */
+ public EncryptedOutputFile newOutputFile(PartitionKey key) {
+ String newDataLocation = locations.newDataLocation(spec, key, generateFilename());
+ OutputFile rawOutputFile = io.newOutputFile(newDataLocation);
+ return encryptionManager.encrypt(rawOutputFile);
+ }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/PartitionedWriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/PartitionedWriter.java
new file mode 100644
index 0000000..7cc0c9f
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/PartitionedWriter.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.Set;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class PartitionedWriter extends BaseWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(PartitionedWriter.class);
+
+ private final PartitionKey key;
+ private final Set<PartitionKey> completedPartitions = Sets.newHashSet();
+
+ PartitionedWriter(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
+ OutputFileFactory fileFactory, FileIO io, long targetFileSize, Schema writeSchema) {
+ super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+ this.key = new PartitionKey(spec, writeSchema);
+ }
+
+ @Override
+ public void write(InternalRow row) throws IOException {
+ key.partition(row);
+
+ PartitionKey currentKey = getCurrentKey();
+ if (!key.equals(currentKey)) {
+ closeCurrent();
+ completedPartitions.add(currentKey);
+
+ if (completedPartitions.contains(key)) {
+ // if rows are not correctly grouped, detect and fail the write
+ PartitionKey existingKey = Iterables.find(completedPartitions, key::equals, null);
+ LOG.warn("Duplicate key: {} == {}", existingKey, key);
+ throw new IllegalStateException("Already closed files for partition: " + key.toPath());
+ }
+
+ setCurrentKey(key.copy());
+ openCurrent();
+ }
+
+ writeInternal(row);
+ }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
index 384a95b..2837dbc 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataRewriter.java
@@ -23,22 +23,23 @@ import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
+import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,58 +49,70 @@ public class RowDataRewriter implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(RowDataRewriter.class);
- private final Broadcast<FileIO> fileIO;
+ private final Schema schema;
+ private final PartitionSpec spec;
+ private final Map<String, String> properties;
+ private final FileFormat format;
+ private final Broadcast<FileIO> io;
private final Broadcast<EncryptionManager> encryptionManager;
- private final String tableSchema;
+ private final LocationProvider locations;
private final String nameMapping;
- private final Writer.WriterFactory writerFactory;
private final boolean caseSensitive;
public RowDataRewriter(Table table, PartitionSpec spec, boolean caseSensitive,
- Broadcast<FileIO> fileIO, Broadcast<EncryptionManager> encryptionManager,
- long targetDataFileSizeInBytes) {
- this.fileIO = fileIO;
+ Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager) {
+ this.schema = table.schema();
+ this.spec = spec;
+ this.locations = table.locationProvider();
+ this.properties = table.properties();
+ this.io = io;
this.encryptionManager = encryptionManager;
this.caseSensitive = caseSensitive;
- this.tableSchema = SchemaParser.toJson(table.schema());
this.nameMapping = table.properties().get(DEFAULT_NAME_MAPPING);
String formatString = table.properties().getOrDefault(
TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
- FileFormat fileFormat = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
- this.writerFactory = new Writer.WriterFactory(spec, fileFormat, table.locationProvider(), table.properties(),
- fileIO, encryptionManager, targetDataFileSizeInBytes, table.schema(), SparkSchemaUtil.convert(table.schema()));
+ this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
}
public List<DataFile> rewriteDataForTasks(JavaRDD<CombinedScanTask> taskRDD) {
- JavaRDD<Writer.TaskCommit> taskCommitRDD = taskRDD.map(this::rewriteDataForTask);
+ JavaRDD<TaskResult> taskCommitRDD = taskRDD.map(this::rewriteDataForTask);
return taskCommitRDD.collect().stream()
.flatMap(taskCommit -> Arrays.stream(taskCommit.files()))
.collect(Collectors.toList());
}
- private Writer.TaskCommit rewriteDataForTask(CombinedScanTask task) throws Exception {
+ private TaskResult rewriteDataForTask(CombinedScanTask task) throws Exception {
TaskContext context = TaskContext.get();
-
- RowDataReader dataReader = new RowDataReader(task, SchemaParser.fromJson(tableSchema),
- SchemaParser.fromJson(tableSchema), nameMapping, fileIO.value(),
- encryptionManager.value(), caseSensitive);
-
int partitionId = context.partitionId();
long taskId = context.taskAttemptId();
- DataWriter<InternalRow> dataWriter = writerFactory.createDataWriter(partitionId, taskId, 0);
+
+ RowDataReader dataReader = new RowDataReader(
+ task, schema, schema, nameMapping, io.value(), encryptionManager.value(), caseSensitive);
+
+ SparkAppenderFactory appenderFactory = new SparkAppenderFactory(
+ properties, schema, SparkSchemaUtil.convert(schema));
+ OutputFileFactory fileFactory = new OutputFileFactory(
+ spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
+
+ BaseWriter writer;
+ if (spec.fields().isEmpty()) {
+ writer = new UnpartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE);
+ } else {
+ writer = new PartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), Long.MAX_VALUE, schema);
+ }
try {
while (dataReader.next()) {
InternalRow row = dataReader.get();
- dataWriter.write(row);
+ writer.write(row);
}
dataReader.close();
dataReader = null;
- return (Writer.TaskCommit) dataWriter.commit();
+ return writer.complete();
} catch (Throwable originalThrowable) {
try {
@@ -111,7 +124,7 @@ public class RowDataRewriter implements Serializable {
if (dataReader != null) {
dataReader.close();
}
- dataWriter.abort();
+ writer.abort();
LOG.error("Aborted commit for partition {} (task {}, attempt {}, stage {}.{})",
partitionId, taskId, context.taskAttemptId(), context.stageId(), context.stageAttemptNumber());
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
new file mode 100644
index 0000000..e29c6eb
--- /dev/null
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.spark.data.SparkAvroWriter;
+import org.apache.iceberg.spark.data.SparkOrcWriter;
+import org.apache.iceberg.spark.data.SparkParquetWriters;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+
+class SparkAppenderFactory {
+ private final Map<String, String> properties;
+ private final Schema writeSchema;
+ private final StructType dsSchema;
+
+ SparkAppenderFactory(Map<String, String> properties, Schema writeSchema, StructType dsSchema) {
+ this.properties = properties;
+ this.writeSchema = writeSchema;
+ this.dsSchema = dsSchema;
+ }
+
+ public FileAppender<InternalRow> newAppender(OutputFile file, FileFormat fileFormat) {
+ MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties);
+ try {
+ switch (fileFormat) {
+ case PARQUET:
+ return Parquet.write(file)
+ .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dsSchema, msgType))
+ .setAll(properties)
+ .metricsConfig(metricsConfig)
+ .schema(writeSchema)
+ .overwrite()
+ .build();
+
+ case AVRO:
+ return Avro.write(file)
+ .createWriterFunc(ignored -> new SparkAvroWriter(dsSchema))
+ .setAll(properties)
+ .schema(writeSchema)
+ .overwrite()
+ .build();
+
+ case ORC:
+ return ORC.write(file)
+ .createWriterFunc(SparkOrcWriter::new)
+ .setAll(properties)
+ .schema(writeSchema)
+ .overwrite()
+ .build();
+
+ default:
+ throw new UnsupportedOperationException("Cannot write unknown format: " + fileFormat);
+ }
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ }
+}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Stats.java b/spark/src/main/java/org/apache/iceberg/spark/source/TaskResult.java
similarity index 63%
copy from spark/src/main/java/org/apache/iceberg/spark/source/Stats.java
copy to spark/src/main/java/org/apache/iceberg/spark/source/TaskResult.java
index 76119c1..1a0b8fc 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Stats.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/TaskResult.java
@@ -19,25 +19,30 @@
package org.apache.iceberg.spark.source;
-import java.util.OptionalLong;
-import org.apache.spark.sql.sources.v2.reader.Statistics;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.iceberg.DataFile;
-class Stats implements Statistics {
- private final OptionalLong sizeInBytes;
- private final OptionalLong numRows;
+class TaskResult implements Serializable {
+ private final DataFile[] files;
- Stats(long sizeInBytes, long numRows) {
- this.sizeInBytes = OptionalLong.of(sizeInBytes);
- this.numRows = OptionalLong.of(numRows);
+ TaskResult() {
+ this.files = new DataFile[0];
}
- @Override
- public OptionalLong sizeInBytes() {
- return sizeInBytes;
+ TaskResult(DataFile file) {
+ this.files = new DataFile[] { file };
}
- @Override
- public OptionalLong numRows() {
- return numRows;
+ TaskResult(List<DataFile> files) {
+ this.files = files.toArray(new DataFile[files.size()]);
+ }
+
+ TaskResult(DataFile[] files) {
+ this.files = files;
+ }
+
+ DataFile[] files() {
+ return files;
}
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Stats.java b/spark/src/main/java/org/apache/iceberg/spark/source/UnpartitionedWriter.java
similarity index 59%
copy from spark/src/main/java/org/apache/iceberg/spark/source/Stats.java
copy to spark/src/main/java/org/apache/iceberg/spark/source/UnpartitionedWriter.java
index 76119c1..5692b6a 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Stats.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/UnpartitionedWriter.java
@@ -19,25 +19,22 @@
package org.apache.iceberg.spark.source;
-import java.util.OptionalLong;
-import org.apache.spark.sql.sources.v2.reader.Statistics;
+import java.io.IOException;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.io.FileIO;
+import org.apache.spark.sql.catalyst.InternalRow;
-class Stats implements Statistics {
- private final OptionalLong sizeInBytes;
- private final OptionalLong numRows;
+class UnpartitionedWriter extends BaseWriter {
+ UnpartitionedWriter(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
+ OutputFileFactory fileFactory, FileIO io, long targetFileSize) {
+ super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
- Stats(long sizeInBytes, long numRows) {
- this.sizeInBytes = OptionalLong.of(sizeInBytes);
- this.numRows = OptionalLong.of(numRows);
+ openCurrent();
}
@Override
- public OptionalLong sizeInBytes() {
- return sizeInBytes;
- }
-
- @Override
- public OptionalLong numRows() {
- return numRows;
+ public void write(InternalRow row) throws IOException {
+ writeInternal(row);
}
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
deleted file mode 100644
index ffe16cf..0000000
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
+++ /dev/null
@@ -1,544 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.spark.source;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DataFiles;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Metrics;
-import org.apache.iceberg.MetricsConfig;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.ReplacePartitions;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.SnapshotSummary;
-import org.apache.iceberg.SnapshotUpdate;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.encryption.EncryptedOutputFile;
-import org.apache.iceberg.encryption.EncryptionManager;
-import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.LocationProvider;
-import org.apache.iceberg.io.OutputFile;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.spark.data.SparkAvroWriter;
-import org.apache.iceberg.spark.data.SparkOrcWriter;
-import org.apache.iceberg.spark.data.SparkParquetWriters;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.Tasks;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.sources.v2.DataSourceOptions;
-import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriter;
-import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
-import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
-import org.apache.spark.sql.types.StructType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
-import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
-import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS;
-import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
-import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
-import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
-import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
-import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
-
-// TODO: parameterize DataSourceWriter with subclass of WriterCommitMessage
-class Writer implements DataSourceWriter {
- private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
-
- private final Table table;
- private final FileFormat format;
- private final Broadcast<FileIO> io;
- private final Broadcast<EncryptionManager> encryptionManager;
- private final boolean replacePartitions;
- private final String applicationId;
- private final String wapId;
- private final long targetFileSize;
- private final Schema writeSchema;
- private final StructType dsSchema;
-
- Writer(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
- DataSourceOptions options, boolean replacePartitions, String applicationId, Schema writeSchema,
- StructType dsSchema) {
- this(table, io, encryptionManager, options, replacePartitions, applicationId, null, writeSchema, dsSchema);
- }
-
- Writer(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
- DataSourceOptions options, boolean replacePartitions, String applicationId, String wapId,
- Schema writeSchema, StructType dsSchema) {
- this.table = table;
- this.format = getFileFormat(table.properties(), options);
- this.io = io;
- this.encryptionManager = encryptionManager;
- this.replacePartitions = replacePartitions;
- this.applicationId = applicationId;
- this.wapId = wapId;
- this.writeSchema = writeSchema;
- this.dsSchema = dsSchema;
-
- long tableTargetFileSize = PropertyUtil.propertyAsLong(
- table.properties(), WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
- this.targetFileSize = options.getLong("target-file-size-bytes", tableTargetFileSize);
- }
-
- private FileFormat getFileFormat(Map<String, String> tableProperties, DataSourceOptions options) {
- Optional<String> formatOption = options.get("write-format");
- String formatString = formatOption
- .orElse(tableProperties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT));
- return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
- }
-
- private boolean isWapTable() {
- return Boolean.parseBoolean(table.properties().getOrDefault(
- TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT));
- }
-
- @Override
- public DataWriterFactory<InternalRow> createWriterFactory() {
- return new WriterFactory(
- table.spec(), format, table.locationProvider(), table.properties(), io, encryptionManager, targetFileSize,
- writeSchema, dsSchema);
- }
-
- @Override
- public void commit(WriterCommitMessage[] messages) {
- if (replacePartitions) {
- replacePartitions(messages);
- } else {
- append(messages);
- }
- }
-
- protected void commitOperation(SnapshotUpdate<?> operation, int numFiles, String description) {
- LOG.info("Committing {} with {} files to table {}", description, numFiles, table);
- if (applicationId != null) {
- operation.set("spark.app.id", applicationId);
- }
-
- if (isWapTable() && wapId != null) {
- // write-audit-publish is enabled for this table and job
- // stage the changes without changing the current snapshot
- operation.set(SnapshotSummary.STAGED_WAP_ID_PROP, wapId);
- operation.stageOnly();
- }
-
- long start = System.currentTimeMillis();
- operation.commit(); // abort is automatically called if this fails
- long duration = System.currentTimeMillis() - start;
- LOG.info("Committed in {} ms", duration);
- }
-
- private void append(WriterCommitMessage[] messages) {
- AppendFiles append = table.newAppend();
-
- int numFiles = 0;
- for (DataFile file : files(messages)) {
- numFiles += 1;
- append.appendFile(file);
- }
-
- commitOperation(append, numFiles, "append");
- }
-
- private void replacePartitions(WriterCommitMessage[] messages) {
- ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
-
- int numFiles = 0;
- for (DataFile file : files(messages)) {
- numFiles += 1;
- dynamicOverwrite.addFile(file);
- }
-
- commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite");
- }
-
- @Override
- public void abort(WriterCommitMessage[] messages) {
- Tasks.foreach(files(messages))
- .retry(propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
- .exponentialBackoff(
- propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
- propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
- propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
- 2.0 /* exponential */)
- .throwFailureWhenFinished()
- .run(file -> {
- io.value().deleteFile(file.path().toString());
- });
- }
-
- protected Table table() {
- return table;
- }
-
- protected Iterable<DataFile> files(WriterCommitMessage[] messages) {
- if (messages.length > 0) {
- return Iterables.concat(Iterables.transform(Arrays.asList(messages), message -> message != null ?
- ImmutableList.copyOf(((TaskCommit) message).files()) :
- ImmutableList.of()));
- }
- return ImmutableList.of();
- }
-
- private int propertyAsInt(String property, int defaultValue) {
- Map<String, String> properties = table.properties();
- String value = properties.get(property);
- if (value != null) {
- return Integer.parseInt(properties.get(property));
- }
- return defaultValue;
- }
-
- @Override
- public String toString() {
- return String.format("IcebergWrite(table=%s, format=%s)", table, format);
- }
-
-
- static class TaskCommit implements WriterCommitMessage {
- private final DataFile[] files;
-
- TaskCommit() {
- this.files = new DataFile[0];
- }
-
- TaskCommit(DataFile file) {
- this.files = new DataFile[] { file };
- }
-
- TaskCommit(List<DataFile> files) {
- this.files = files.toArray(new DataFile[files.size()]);
- }
-
- DataFile[] files() {
- return files;
- }
- }
-
- static class WriterFactory implements DataWriterFactory<InternalRow> {
- private final PartitionSpec spec;
- private final FileFormat format;
- private final LocationProvider locations;
- private final Map<String, String> properties;
- private final Broadcast<FileIO> io;
- private final Broadcast<EncryptionManager> encryptionManager;
- private final long targetFileSize;
- private final Schema writeSchema;
- private final StructType dsSchema;
-
- WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations,
- Map<String, String> properties, Broadcast<FileIO> io,
- Broadcast<EncryptionManager> encryptionManager, long targetFileSize,
- Schema writeSchema, StructType dsSchema) {
- this.spec = spec;
- this.format = format;
- this.locations = locations;
- this.properties = properties;
- this.io = io;
- this.encryptionManager = encryptionManager;
- this.targetFileSize = targetFileSize;
- this.writeSchema = writeSchema;
- this.dsSchema = dsSchema;
- }
-
- @Override
- public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
- OutputFileFactory fileFactory = new OutputFileFactory(partitionId, taskId, epochId);
- AppenderFactory<InternalRow> appenderFactory = new SparkAppenderFactory();
-
- if (spec.fields().isEmpty()) {
- return new UnpartitionedWriter(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize);
- } else {
- return new PartitionedWriter(
- spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, writeSchema);
- }
- }
-
- private class SparkAppenderFactory implements AppenderFactory<InternalRow> {
- @Override
- public FileAppender<InternalRow> newAppender(OutputFile file, FileFormat fileFormat) {
- MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties);
- try {
- switch (fileFormat) {
- case PARQUET:
- return Parquet.write(file)
- .createWriterFunc(msgType -> SparkParquetWriters.buildWriter(dsSchema, msgType))
- .setAll(properties)
- .metricsConfig(metricsConfig)
- .schema(writeSchema)
- .overwrite()
- .build();
-
- case AVRO:
- return Avro.write(file)
- .createWriterFunc(ignored -> new SparkAvroWriter(dsSchema))
- .setAll(properties)
- .schema(writeSchema)
- .overwrite()
- .build();
-
- case ORC:
- return ORC.write(file)
- .createWriterFunc(SparkOrcWriter::new)
- .setAll(properties)
- .schema(writeSchema)
- .overwrite()
- .build();
-
- default:
- throw new UnsupportedOperationException("Cannot write unknown format: " + fileFormat);
- }
- } catch (IOException e) {
- throw new RuntimeIOException(e);
- }
- }
- }
-
- private class OutputFileFactory {
- private final int partitionId;
- private final long taskId;
- // The purpose of this uuid is to be able to know from two paths that they were written by the same operation.
- // That's useful, for example, if a Spark job dies and leaves files in the file system, you can identify them all
- // with a recursive listing and grep.
- private final String uuid = UUID.randomUUID().toString();
- private int fileCount;
-
- OutputFileFactory(int partitionId, long taskId, long epochId) {
- this.partitionId = partitionId;
- this.taskId = taskId;
- this.fileCount = 0;
- }
-
- private String generateFilename() {
- return format.addExtension(String.format("%05d-%d-%s-%05d", partitionId, taskId, uuid, fileCount++));
- }
-
- /**
- * Generates EncryptedOutputFile for UnpartitionedWriter.
- */
- public EncryptedOutputFile newOutputFile() {
- OutputFile file = io.value().newOutputFile(locations.newDataLocation(generateFilename()));
- return encryptionManager.value().encrypt(file);
- }
-
- /**
- * Generates EncryptedOutputFile for PartitionedWriter.
- */
- public EncryptedOutputFile newOutputFile(PartitionKey key) {
- String newDataLocation = locations.newDataLocation(spec, key, generateFilename());
- OutputFile rawOutputFile = io.value().newOutputFile(newDataLocation);
- return encryptionManager.value().encrypt(rawOutputFile);
- }
- }
- }
-
- private interface AppenderFactory<T> {
- FileAppender<T> newAppender(OutputFile file, FileFormat format);
- }
-
- private abstract static class BaseWriter implements DataWriter<InternalRow> {
- protected static final int ROWS_DIVISOR = 1000;
-
- private final List<DataFile> completedFiles = Lists.newArrayList();
- private final PartitionSpec spec;
- private final FileFormat format;
- private final AppenderFactory<InternalRow> appenderFactory;
- private final WriterFactory.OutputFileFactory fileFactory;
- private final FileIO fileIo;
- private final long targetFileSize;
- private PartitionKey currentKey = null;
- private FileAppender<InternalRow> currentAppender = null;
- private EncryptedOutputFile currentFile = null;
- private long currentRows = 0;
-
- BaseWriter(PartitionSpec spec, FileFormat format, AppenderFactory<InternalRow> appenderFactory,
- WriterFactory.OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize) {
- this.spec = spec;
- this.format = format;
- this.appenderFactory = appenderFactory;
- this.fileFactory = fileFactory;
- this.fileIo = fileIo;
- this.targetFileSize = targetFileSize;
- }
-
- @Override
- public abstract void write(InternalRow row) throws IOException;
-
- public void writeInternal(InternalRow row) throws IOException {
- //TODO: ORC file now not support target file size before closed
- if (!format.equals(FileFormat.ORC) &&
- currentRows % ROWS_DIVISOR == 0 && currentAppender.length() >= targetFileSize) {
- closeCurrent();
- openCurrent();
- }
-
- currentAppender.add(row);
- currentRows++;
- }
-
- @Override
- public WriterCommitMessage commit() throws IOException {
- closeCurrent();
-
- return new TaskCommit(completedFiles);
- }
-
- @Override
- public void abort() throws IOException {
- closeCurrent();
-
- // clean up files created by this writer
- Tasks.foreach(completedFiles)
- .throwFailureWhenFinished()
- .noRetry()
- .run(file -> fileIo.deleteFile(file.path().toString()));
- }
-
- protected void openCurrent() {
- if (spec.fields().size() == 0) {
- // unpartitioned
- currentFile = fileFactory.newOutputFile();
- } else {
- // partitioned
- currentFile = fileFactory.newOutputFile(currentKey);
- }
- currentAppender = appenderFactory.newAppender(currentFile.encryptingOutputFile(), format);
- currentRows = 0;
- }
-
- protected void closeCurrent() throws IOException {
- if (currentAppender != null) {
- currentAppender.close();
- // metrics are only valid after the appender is closed
- Metrics metrics = currentAppender.metrics();
- long fileSizeInBytes = currentAppender.length();
- List<Long> splitOffsets = currentAppender.splitOffsets();
- this.currentAppender = null;
-
- if (metrics.recordCount() == 0L) {
- fileIo.deleteFile(currentFile.encryptingOutputFile());
- } else {
- DataFile dataFile = DataFiles.builder(spec)
- .withEncryptionKeyMetadata(currentFile.keyMetadata())
- .withPath(currentFile.encryptingOutputFile().location())
- .withFileSizeInBytes(fileSizeInBytes)
- .withPartition(spec.fields().size() == 0 ? null : currentKey) // set null if unpartitioned
- .withMetrics(metrics)
- .withSplitOffsets(splitOffsets)
- .build();
- completedFiles.add(dataFile);
- }
-
- this.currentFile = null;
- }
- }
-
- protected PartitionKey getCurrentKey() {
- return currentKey;
- }
-
- protected void setCurrentKey(PartitionKey currentKey) {
- this.currentKey = currentKey;
- }
- }
-
- private static class UnpartitionedWriter extends BaseWriter {
- UnpartitionedWriter(
- PartitionSpec spec,
- FileFormat format,
- AppenderFactory<InternalRow> appenderFactory,
- WriterFactory.OutputFileFactory fileFactory,
- FileIO fileIo,
- long targetFileSize) {
- super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
-
- openCurrent();
- }
-
- @Override
- public void write(InternalRow row) throws IOException {
- writeInternal(row);
- }
- }
-
- private static class PartitionedWriter extends BaseWriter {
- private final PartitionKey key;
- private final Set<PartitionKey> completedPartitions = Sets.newHashSet();
-
- PartitionedWriter(
- PartitionSpec spec,
- FileFormat format,
- AppenderFactory<InternalRow> appenderFactory,
- WriterFactory.OutputFileFactory fileFactory,
- FileIO fileIo,
- long targetFileSize,
- Schema writeSchema) {
- super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
- this.key = new PartitionKey(spec, writeSchema);
- }
-
- @Override
- public void write(InternalRow row) throws IOException {
- key.partition(row);
-
- PartitionKey currentKey = getCurrentKey();
- if (!key.equals(currentKey)) {
- closeCurrent();
- completedPartitions.add(currentKey);
-
- if (completedPartitions.contains(key)) {
- // if rows are not correctly grouped, detect and fail the write
- PartitionKey existingKey = Iterables.find(completedPartitions, key::equals, null);
- LOG.warn("Duplicate key: {} == {}", existingKey, key);
- throw new IllegalStateException("Already closed files for partition: " + key.toPath());
- }
-
- setCurrentKey(key.copy());
- openCurrent();
- }
-
- writeInternal(row);
- }
- }
-}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java b/spark2/src/main/java/org/apache/iceberg/spark/SparkFilters.java
similarity index 100%
rename from spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java
rename to spark2/src/main/java/org/apache/iceberg/spark/SparkFilters.java
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
similarity index 100%
rename from spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
rename to spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
similarity index 93%
rename from spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
rename to spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
index a9faf00..51859de 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -32,7 +32,6 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
-import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
@@ -44,7 +43,6 @@ import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -64,8 +62,6 @@ import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters;
import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns;
import org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics;
import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.slf4j.Logger;
@@ -396,8 +392,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
private transient Schema tableSchema = null;
private transient Schema expectedSchema = null;
- private transient NameMapping nameMapping = null;
- private transient String[] preferredLocations;
+ private transient String[] preferredLocations = null;
private ReadTask(CombinedScanTask task, String tableSchemaString, String expectedSchemaString,
String nameMappingString, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
@@ -465,7 +460,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
public InputPartitionReader<InternalRow> create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema,
String nameMapping, FileIO io,
EncryptionManager encryptionManager, boolean caseSensitive) {
- return new RowDataReader(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive);
+ return new RowReader(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive);
}
}
@@ -480,41 +475,21 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
public InputPartitionReader<ColumnarBatch> create(CombinedScanTask task, Schema tableSchema, Schema expectedSchema,
String nameMapping, FileIO io,
EncryptionManager encryptionManager, boolean caseSensitive) {
- return new BatchDataReader(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, batchSize);
+ return new BatchReader(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, batchSize);
}
}
- private static class StructLikeInternalRow implements StructLike {
- private final DataType[] types;
- private InternalRow row = null;
-
- StructLikeInternalRow(StructType struct) {
- this.types = new DataType[struct.size()];
- StructField[] fields = struct.fields();
- for (int i = 0; i < fields.length; i += 1) {
- types[i] = fields[i].dataType();
- }
- }
-
- public StructLikeInternalRow setRow(InternalRow row) {
- this.row = row;
- return this;
- }
-
- @Override
- public int size() {
- return types.length;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public <T> T get(int pos, Class<T> javaClass) {
- return javaClass.cast(row.get(pos, types[pos]));
+ private static class RowReader extends RowDataReader implements InputPartitionReader<InternalRow> {
+ RowReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, String nameMapping, FileIO io,
+ EncryptionManager encryptionManager, boolean caseSensitive) {
+ super(task, tableSchema, expectedSchema, nameMapping, io, encryptionManager, caseSensitive);
}
+ }
- @Override
- public <T> void set(int pos, T value) {
- throw new UnsupportedOperationException("Not implemented: set");
+ private static class BatchReader extends BatchDataReader implements InputPartitionReader<ColumnarBatch> {
+ BatchReader(CombinedScanTask task, Schema expectedSchema, String nameMapping, FileIO io,
+ EncryptionManager encryptionManager, boolean caseSensitive, int size) {
+ super(task, expectedSchema, nameMapping, io, encryptionManager, caseSensitive, size);
}
}
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Stats.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Stats.java
similarity index 100%
rename from spark/src/main/java/org/apache/iceberg/spark/source/Stats.java
rename to spark2/src/main/java/org/apache/iceberg/spark/source/Stats.java
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java b/spark2/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java
similarity index 100%
rename from spark/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java
rename to spark2/src/main/java/org/apache/iceberg/spark/source/StreamingWriter.java
diff --git a/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
new file mode 100644
index 0000000..a24820b
--- /dev/null
+++ b/spark2/src/main/java/org/apache/iceberg/spark/source/Writer.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ReplacePartitions;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.SnapshotUpdate;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.LocationProvider;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.Tasks;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.sources.v2.DataSourceOptions;
+import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
+import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
+import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS;
+import static org.apache.iceberg.TableProperties.COMMIT_MAX_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS;
+import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
+import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
+import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
+import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+// TODO: parameterize DataSourceWriter with subclass of WriterCommitMessage
+class Writer implements DataSourceWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
+
+ private final Table table;
+ private final FileFormat format;
+ private final Broadcast<FileIO> io;
+ private final Broadcast<EncryptionManager> encryptionManager;
+ private final boolean replacePartitions;
+ private final String applicationId;
+ private final String wapId;
+ private final long targetFileSize;
+ private final Schema writeSchema;
+ private final StructType dsSchema;
+
+ Writer(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
+ DataSourceOptions options, boolean replacePartitions, String applicationId, Schema writeSchema,
+ StructType dsSchema) {
+ this(table, io, encryptionManager, options, replacePartitions, applicationId, null, writeSchema, dsSchema);
+ }
+
+ Writer(Table table, Broadcast<FileIO> io, Broadcast<EncryptionManager> encryptionManager,
+ DataSourceOptions options, boolean replacePartitions, String applicationId, String wapId,
+ Schema writeSchema, StructType dsSchema) {
+ this.table = table;
+ this.format = getFileFormat(table.properties(), options);
+ this.io = io;
+ this.encryptionManager = encryptionManager;
+ this.replacePartitions = replacePartitions;
+ this.applicationId = applicationId;
+ this.wapId = wapId;
+ this.writeSchema = writeSchema;
+ this.dsSchema = dsSchema;
+
+ long tableTargetFileSize = PropertyUtil.propertyAsLong(
+ table.properties(), WRITE_TARGET_FILE_SIZE_BYTES, WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+ this.targetFileSize = options.getLong("target-file-size-bytes", tableTargetFileSize);
+ }
+
+ private FileFormat getFileFormat(Map<String, String> tableProperties, DataSourceOptions options) {
+ Optional<String> formatOption = options.get("write-format");
+ String formatString = formatOption
+ .orElse(tableProperties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT));
+ return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+ }
+
+ private boolean isWapTable() {
+ return Boolean.parseBoolean(table.properties().getOrDefault(
+ TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT));
+ }
+
+ @Override
+ public DataWriterFactory<InternalRow> createWriterFactory() {
+ return new WriterFactory(
+ table.spec(), format, table.locationProvider(), table.properties(), io, encryptionManager, targetFileSize,
+ writeSchema, dsSchema);
+ }
+
+ @Override
+ public void commit(WriterCommitMessage[] messages) {
+ if (replacePartitions) {
+ replacePartitions(messages);
+ } else {
+ append(messages);
+ }
+ }
+
+ protected void commitOperation(SnapshotUpdate<?> operation, int numFiles, String description) {
+ LOG.info("Committing {} with {} files to table {}", description, numFiles, table);
+ if (applicationId != null) {
+ operation.set("spark.app.id", applicationId);
+ }
+
+ if (isWapTable() && wapId != null) {
+ // write-audit-publish is enabled for this table and job
+ // stage the changes without changing the current snapshot
+ operation.set(SnapshotSummary.STAGED_WAP_ID_PROP, wapId);
+ operation.stageOnly();
+ }
+
+ long start = System.currentTimeMillis();
+ operation.commit(); // abort is automatically called if this fails
+ long duration = System.currentTimeMillis() - start;
+ LOG.info("Committed in {} ms", duration);
+ }
+
+ private void append(WriterCommitMessage[] messages) {
+ AppendFiles append = table.newAppend();
+
+ int numFiles = 0;
+ for (DataFile file : files(messages)) {
+ numFiles += 1;
+ append.appendFile(file);
+ }
+
+ commitOperation(append, numFiles, "append");
+ }
+
+ private void replacePartitions(WriterCommitMessage[] messages) {
+ ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
+
+ int numFiles = 0;
+ for (DataFile file : files(messages)) {
+ numFiles += 1;
+ dynamicOverwrite.addFile(file);
+ }
+
+ commitOperation(dynamicOverwrite, numFiles, "dynamic partition overwrite");
+ }
+
+ @Override
+ public void abort(WriterCommitMessage[] messages) {
+ Map<String, String> props = table.properties();
+ Tasks.foreach(files(messages))
+ .retry(PropertyUtil.propertyAsInt(props, COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
+ .exponentialBackoff(
+ PropertyUtil.propertyAsInt(props, COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+ PropertyUtil.propertyAsInt(props, COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+ PropertyUtil.propertyAsInt(props, COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+ 2.0 /* exponential */)
+ .throwFailureWhenFinished()
+ .run(file -> {
+ io.value().deleteFile(file.path().toString());
+ });
+ }
+
+ protected Table table() {
+ return table;
+ }
+
+ protected Iterable<DataFile> files(WriterCommitMessage[] messages) {
+ if (messages.length > 0) {
+ return Iterables.concat(Iterables.transform(Arrays.asList(messages), message -> message != null ?
+ ImmutableList.copyOf(((TaskResult) message).files()) :
+ ImmutableList.of()));
+ }
+ return ImmutableList.of();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("IcebergWrite(table=%s, format=%s)", table, format);
+ }
+
+ private static class TaskCommit extends TaskResult implements WriterCommitMessage {
+ TaskCommit(TaskResult toCopy) {
+ super(toCopy.files());
+ }
+ }
+
+ static class WriterFactory implements DataWriterFactory<InternalRow> {
+ private final PartitionSpec spec;
+ private final FileFormat format;
+ private final LocationProvider locations;
+ private final Map<String, String> properties;
+ private final Broadcast<FileIO> io;
+ private final Broadcast<EncryptionManager> encryptionManager;
+ private final long targetFileSize;
+ private final Schema writeSchema;
+ private final StructType dsSchema;
+
+ WriterFactory(PartitionSpec spec, FileFormat format, LocationProvider locations,
+ Map<String, String> properties, Broadcast<FileIO> io,
+ Broadcast<EncryptionManager> encryptionManager, long targetFileSize,
+ Schema writeSchema, StructType dsSchema) {
+ this.spec = spec;
+ this.format = format;
+ this.locations = locations;
+ this.properties = properties;
+ this.io = io;
+ this.encryptionManager = encryptionManager;
+ this.targetFileSize = targetFileSize;
+ this.writeSchema = writeSchema;
+ this.dsSchema = dsSchema;
+ }
+
+ @Override
+ public DataWriter<InternalRow> createDataWriter(int partitionId, long taskId, long epochId) {
+ OutputFileFactory fileFactory = new OutputFileFactory(
+ spec, format, locations, io.value(), encryptionManager.value(), partitionId, taskId);
+ SparkAppenderFactory appenderFactory = new SparkAppenderFactory(properties, writeSchema, dsSchema);
+
+ if (spec.fields().isEmpty()) {
+ return new Unpartitioned24Writer(spec, format, appenderFactory, fileFactory, io.value(), targetFileSize);
+ } else {
+ return new Partitioned24Writer(
+ spec, format, appenderFactory, fileFactory, io.value(), targetFileSize, writeSchema);
+ }
+ }
+ }
+
+ private static class Unpartitioned24Writer extends UnpartitionedWriter implements DataWriter<InternalRow> {
+ Unpartitioned24Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
+ OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize) {
+ super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize);
+ }
+
+ @Override
+ public WriterCommitMessage commit() throws IOException {
+ return new TaskCommit(complete());
+ }
+ }
+
+ private static class Partitioned24Writer extends PartitionedWriter implements DataWriter<InternalRow> {
+ Partitioned24Writer(PartitionSpec spec, FileFormat format, SparkAppenderFactory appenderFactory,
+ OutputFileFactory fileFactory, FileIO fileIo, long targetFileSize, Schema writeSchema) {
+ super(spec, format, appenderFactory, fileFactory, fileIo, targetFileSize, writeSchema);
+ }
+
+ @Override
+ public WriterCommitMessage commit() throws IOException {
+ return new TaskCommit(complete());
+ }
+ }
+}
diff --git a/spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/spark2/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
similarity index 100%
rename from spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
rename to spark2/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
diff --git a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala b/spark2/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
similarity index 100%
rename from spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
rename to spark2/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala
diff --git a/spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java b/spark2/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
rename to spark2/src/test/java/org/apache/iceberg/actions/TestRemoveOrphanFilesAction.java
diff --git a/spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java b/spark2/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
rename to spark2/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
diff --git a/spark/src/test/java/org/apache/iceberg/actions/TestRewriteManifestsAction.java b/spark2/src/test/java/org/apache/iceberg/actions/TestRewriteManifestsAction.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/actions/TestRewriteManifestsAction.java
rename to spark2/src/test/java/org/apache/iceberg/actions/TestRewriteManifestsAction.java
diff --git a/spark/src/test/java/org/apache/iceberg/examples/ConcurrencyTest.java b/spark2/src/test/java/org/apache/iceberg/examples/ConcurrencyTest.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/examples/ConcurrencyTest.java
rename to spark2/src/test/java/org/apache/iceberg/examples/ConcurrencyTest.java
diff --git a/spark/src/test/java/org/apache/iceberg/examples/README.md b/spark2/src/test/java/org/apache/iceberg/examples/README.md
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/examples/README.md
rename to spark2/src/test/java/org/apache/iceberg/examples/README.md
diff --git a/spark/src/test/java/org/apache/iceberg/examples/ReadAndWriteTablesTest.java b/spark2/src/test/java/org/apache/iceberg/examples/ReadAndWriteTablesTest.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/examples/ReadAndWriteTablesTest.java
rename to spark2/src/test/java/org/apache/iceberg/examples/ReadAndWriteTablesTest.java
diff --git a/spark/src/test/java/org/apache/iceberg/examples/SchemaEvolutionTest.java b/spark2/src/test/java/org/apache/iceberg/examples/SchemaEvolutionTest.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/examples/SchemaEvolutionTest.java
rename to spark2/src/test/java/org/apache/iceberg/examples/SchemaEvolutionTest.java
diff --git a/spark/src/test/java/org/apache/iceberg/examples/SimpleRecord.java b/spark2/src/test/java/org/apache/iceberg/examples/SimpleRecord.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/examples/SimpleRecord.java
rename to spark2/src/test/java/org/apache/iceberg/examples/SimpleRecord.java
diff --git a/spark/src/test/java/org/apache/iceberg/examples/SnapshotFunctionalityTest.java b/spark2/src/test/java/org/apache/iceberg/examples/SnapshotFunctionalityTest.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/examples/SnapshotFunctionalityTest.java
rename to spark2/src/test/java/org/apache/iceberg/examples/SnapshotFunctionalityTest.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java b/spark2/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java
rename to spark2/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/LogMessage.java b/spark2/src/test/java/org/apache/iceberg/spark/source/LogMessage.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/LogMessage.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/LogMessage.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/SimpleRecord.java b/spark2/src/test/java/org/apache/iceberg/spark/source/SimpleRecord.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/SimpleRecord.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/SimpleRecord.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestAvroScan.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestAvroScan.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestAvroScan.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestAvroScan.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWrites.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSource.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSource.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSource.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSource.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestIdentityPartitionData.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestParquetScan.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestReadProjection.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestReadProjection.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestReadProjection.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestReadProjection.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkSchema.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkSchema.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestSparkSchema.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkSchema.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtil.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestTables.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestTables.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestTables.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestTables.java
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
similarity index 100%
rename from spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
rename to spark2/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java
diff --git a/spark/src/test/resources/data/books.json b/spark2/src/test/resources/data/books.json
similarity index 100%
rename from spark/src/test/resources/data/books.json
rename to spark2/src/test/resources/data/books.json
diff --git a/spark/src/test/resources/data/new-books.json b/spark2/src/test/resources/data/new-books.json
similarity index 100%
rename from spark/src/test/resources/data/new-books.json
rename to spark2/src/test/resources/data/new-books.json