You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by st...@apache.org on 2023/05/23 03:33:27 UTC
[iceberg] branch master updated: Flink: Port #7171 to 1.15 (#7679)
This is an automated email from the ASF dual-hosted git repository.
stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 53683f87fb Flink: Port #7171 to 1.15 (#7679)
53683f87fb is described below
commit 53683f87fb0f1dab12358f826e6f8a7b1ce13b25
Author: Xianyang Liu <li...@hotmail.com>
AuthorDate: Tue May 23 11:33:20 2023 +0800
Flink: Port #7171 to 1.15 (#7679)
Co-authored-by: xianyangliu <xi...@tencent.com>
---
.../iceberg/flink/sink/FlinkManifestUtil.java | 20 ++-
.../org/apache/iceberg/flink/sink/FlinkSink.java | 3 +-
.../iceberg/flink/sink/IcebergFilesCommitter.java | 15 ++-
.../org/apache/iceberg/flink/SimpleDataUtil.java | 29 +++-
.../iceberg/flink/sink/TestFlinkManifest.java | 9 +-
.../flink/sink/TestIcebergFilesCommitter.java | 148 ++++++++++++++++++++-
6 files changed, 200 insertions(+), 24 deletions(-)
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
index 25badc372a..00d55f937c 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink.sink;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.function.Supplier;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
@@ -54,8 +55,10 @@ class FlinkManifestUtil {
return writer.toManifestFile();
}
- static List<DataFile> readDataFiles(ManifestFile manifestFile, FileIO io) throws IOException {
- try (CloseableIterable<DataFile> dataFiles = ManifestFiles.read(manifestFile, io)) {
+ static List<DataFile> readDataFiles(
+ ManifestFile manifestFile, FileIO io, Map<Integer, PartitionSpec> specsById)
+ throws IOException {
+ try (CloseableIterable<DataFile> dataFiles = ManifestFiles.read(manifestFile, io, specsById)) {
return Lists.newArrayList(dataFiles);
}
}
@@ -73,6 +76,12 @@ class FlinkManifestUtil {
attemptNumber);
}
+ /**
+ * Write the {@link WriteResult} to temporary manifest files.
+ *
+ * @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same
+ * partition spec
+ */
static DeltaManifests writeCompletedFiles(
WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec)
throws IOException {
@@ -104,19 +113,20 @@ class FlinkManifestUtil {
return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles());
}
- static WriteResult readCompletedFiles(DeltaManifests deltaManifests, FileIO io)
+ static WriteResult readCompletedFiles(
+ DeltaManifests deltaManifests, FileIO io, Map<Integer, PartitionSpec> specsById)
throws IOException {
WriteResult.Builder builder = WriteResult.builder();
// Read the completed data files from persisted data manifest file.
if (deltaManifests.dataManifest() != null) {
- builder.addDataFiles(readDataFiles(deltaManifests.dataManifest(), io));
+ builder.addDataFiles(readDataFiles(deltaManifests.dataManifest(), io, specsById));
}
// Read the completed delete files from persisted delete manifests file.
if (deltaManifests.deleteManifest() != null) {
try (CloseableIterable<DeleteFile> deleteFiles =
- ManifestFiles.readDeleteManifest(deltaManifests.deleteManifest(), io, null)) {
+ ManifestFiles.readDeleteManifest(deltaManifests.deleteManifest(), io, specsById)) {
builder.addDeleteFiles(deleteFiles);
}
}
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
index 46a229d4c8..844efd6ca6 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -428,7 +428,8 @@ public class FlinkSink {
flinkWriteConf.overwriteMode(),
snapshotProperties,
flinkWriteConf.workerPoolSize(),
- flinkWriteConf.branch());
+ flinkWriteConf.branch(),
+ table.spec());
SingleOutputStreamOperator<Void> committerStream =
writerStream
.transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter)
diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index e11975b3ef..3805ab2984 100644
--- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -41,6 +41,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Snapshot;
@@ -48,6 +49,7 @@ import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
@@ -120,6 +122,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
private final Integer workerPoolSize;
+ private final PartitionSpec spec;
private transient ExecutorService workerPool;
IcebergFilesCommitter(
@@ -127,12 +130,14 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
boolean replacePartitions,
Map<String, String> snapshotProperties,
Integer workerPoolSize,
- String branch) {
+ String branch,
+ PartitionSpec spec) {
this.tableLoader = tableLoader;
this.replacePartitions = replacePartitions;
this.snapshotProperties = snapshotProperties;
this.workerPoolSize = workerPoolSize;
this.branch = branch;
+ this.spec = spec;
}
@Override
@@ -263,7 +268,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
SimpleVersionedSerialization.readVersionAndDeSerialize(
DeltaManifestsSerializer.INSTANCE, e.getValue());
pendingResults.put(
- e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io()));
+ e.getKey(),
+ FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs()));
manifests.addAll(deltaManifests.manifests());
}
@@ -443,7 +449,7 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build();
DeltaManifests deltaManifests =
FlinkManifestUtil.writeCompletedFiles(
- result, () -> manifestOutputFileFactory.create(checkpointId), table.spec());
+ result, () -> manifestOutputFileFactory.create(checkpointId), spec);
return SimpleVersionedSerialization.writeVersionAndSerialize(
DeltaManifestsSerializer.INSTANCE, deltaManifests);
@@ -469,7 +475,8 @@ class IcebergFilesCommitter extends AbstractStreamOperator<Void>
}
}
- private static ListStateDescriptor<SortedMap<Long, byte[]>> buildStateDescriptor() {
+ @VisibleForTesting
+ static ListStateDescriptor<SortedMap<Long, byte[]>> buildStateDescriptor() {
Comparator<Long> longComparator = Comparators.forType(Types.LongType.get());
// Construct a SortedMapTypeInfo.
SortedMapTypeInfo<Long, byte[]> sortedMapTypeInfo =
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index 345d88a48a..408c281004 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -44,6 +44,7 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.data.GenericRecord;
@@ -134,6 +135,20 @@ public class SimpleDataUtil {
String filename,
List<RowData> rows)
throws IOException {
+ return writeFile(table, schema, spec, conf, location, filename, rows, null);
+ }
+
+ /** Write the list of {@link RowData} to the given path and with the given partition data */
+ public static DataFile writeFile(
+ Table table,
+ Schema schema,
+ PartitionSpec spec,
+ Configuration conf,
+ String location,
+ String filename,
+ List<RowData> rows,
+ StructLike partition)
+ throws IOException {
Path path = new Path(location, filename);
FileFormat fileFormat = FileFormat.fromFileName(filename);
Preconditions.checkNotNull(fileFormat, "Cannot determine format for file: %s", filename);
@@ -148,10 +163,16 @@ public class SimpleDataUtil {
closeableAppender.addAll(rows);
}
- return DataFiles.builder(spec)
- .withInputFile(HadoopInputFile.fromPath(path, conf))
- .withMetrics(appender.metrics())
- .build();
+ DataFiles.Builder builder =
+ DataFiles.builder(spec)
+ .withInputFile(HadoopInputFile.fromPath(path, conf))
+ .withMetrics(appender.metrics());
+
+ if (partition != null) {
+ builder = builder.withPartition(partition);
+ }
+
+ return builder.build();
}
public static DeleteFile writeEqDeleteFile(
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
index 3680142010..f171485a90 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkManifest.java
@@ -110,7 +110,8 @@ public class TestFlinkManifest {
() -> factory.create(curCkpId),
table.spec());
- WriteResult result = FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io());
+ WriteResult result =
+ FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs());
Assert.assertEquals("Size of data file list are not equal.", 10, result.deleteFiles().length);
for (int i = 0; i < dataFiles.size(); i++) {
TestHelpers.assertEquals(dataFiles.get(i), result.dataFiles()[i]);
@@ -157,7 +158,8 @@ public class TestFlinkManifest {
userProvidedFolder.toPath(),
Paths.get(deltaManifests.dataManifest().path()).getParent());
- WriteResult result = FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io());
+ WriteResult result =
+ FlinkManifestUtil.readCompletedFiles(deltaManifests, table.io(), table.specs());
Assert.assertEquals(0, result.deleteFiles().length);
Assert.assertEquals(5, result.dataFiles().length);
@@ -228,7 +230,8 @@ public class TestFlinkManifest {
"Serialization v1 should not have null data manifest.", delta.dataManifest());
TestHelpers.assertEquals(manifest, delta.dataManifest());
- List<DataFile> actualFiles = FlinkManifestUtil.readDataFiles(delta.dataManifest(), table.io());
+ List<DataFile> actualFiles =
+ FlinkManifestUtil.readDataFiles(delta.dataManifest(), table.io(), table.specs());
Assert.assertEquals(10, actualFiles.size());
for (int i = 0; i < 10; i++) {
TestHelpers.assertEquals(dataFiles.get(i), actualFiles.get(i));
diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
index a4f29d47f4..a2fe092b7c 100644
--- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
+++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.flink.sink;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.flink.sink.IcebergFilesCommitter.MAX_CONTINUOUS_EMPTY_COMMITS;
import static org.apache.iceberg.flink.sink.ManifestOutputFileFactory.FLINK_MANIFEST_LOCATION;
+import static org.assertj.core.api.Assertions.assertThat;
import java.io.File;
import java.io.IOException;
@@ -28,9 +29,14 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
+import java.util.NavigableMap;
+import java.util.SortedMap;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -50,7 +56,9 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.GenericManifestFile;
import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
import org.apache.iceberg.TableTestBase;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.SimpleDataUtil;
@@ -60,6 +68,7 @@ import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.ThreadPools;
import org.junit.Assert;
@@ -744,7 +753,8 @@ public class TestIcebergFilesCommitter extends TableTestBase {
// 2. Read the data files from manifests and assert.
List<DataFile> dataFiles =
- FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io());
+ FlinkManifestUtil.readDataFiles(
+ createTestingManifestFile(manifestPath), table.io(), table.specs());
Assert.assertEquals(1, dataFiles.size());
TestHelpers.assertEquals(dataFile1, dataFiles.get(0));
@@ -790,7 +800,8 @@ public class TestIcebergFilesCommitter extends TableTestBase {
// 2. Read the data files from manifests and assert.
List<DataFile> dataFiles =
- FlinkManifestUtil.readDataFiles(createTestingManifestFile(manifestPath), table.io());
+ FlinkManifestUtil.readDataFiles(
+ createTestingManifestFile(manifestPath), table.io(), table.specs());
Assert.assertEquals(1, dataFiles.size());
TestHelpers.assertEquals(dataFile1, dataFiles.get(0));
@@ -877,6 +888,112 @@ public class TestIcebergFilesCommitter extends TableTestBase {
}
}
+ @Test
+ public void testSpecEvolution() throws Exception {
+ long timestamp = 0;
+ int checkpointId = 0;
+ List<RowData> rows = Lists.newArrayList();
+ JobID jobId = new JobID();
+
+ OperatorID operatorId;
+ OperatorSubtaskState snapshot;
+ DataFile dataFile;
+ int specId;
+
+ try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobId)) {
+ harness.setup();
+ harness.open();
+ operatorId = harness.getOperator().getOperatorID();
+
+ assertSnapshotSize(0);
+
+ checkpointId++;
+ RowData rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+ // table unpartitioned
+ dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData));
+ harness.processElement(of(dataFile), ++timestamp);
+ rows.add(rowData);
+ harness.snapshot(checkpointId, ++timestamp);
+
+ specId =
+ getStagingManifestSpecId(harness.getOperator().getOperatorStateBackend(), checkpointId);
+ assertThat(specId).isEqualTo(table.spec().specId());
+
+ harness.notifyOfCompletedCheckpoint(checkpointId);
+
+ // Change partition spec
+ table.refresh();
+ PartitionSpec oldSpec = table.spec();
+ table.updateSpec().addField("id").commit();
+
+ checkpointId++;
+ rowData = SimpleDataUtil.createRowData(checkpointId, "hello" + checkpointId);
+ // write data with old partition spec
+ dataFile = writeDataFile("data-" + checkpointId, ImmutableList.of(rowData), oldSpec, null);
+ harness.processElement(of(dataFile), ++timestamp);
+ rows.add(rowData);
+ snapshot = harness.snapshot(checkpointId, ++timestamp);
+
+ specId =
+ getStagingManifestSpecId(harness.getOperator().getOperatorStateBackend(), checkpointId);
+ assertThat(specId).isEqualTo(oldSpec.specId());
+
+ harness.notifyOfCompletedCheckpoint(checkpointId);
+
+ assertFlinkManifests(0);
+
+ SimpleDataUtil.assertTableRows(table, ImmutableList.copyOf(rows), branch);
+ assertSnapshotSize(checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
+ }
+
+ // Restore from the given snapshot
+ try (OneInputStreamOperatorTestHarness<WriteResult, Void> harness = createStreamSink(jobId)) {
+ harness.getStreamConfig().setOperatorID(operatorId);
+ harness.setup();
+ harness.initializeState(snapshot);
+ harness.open();
+
+ SimpleDataUtil.assertTableRows(table, rows, branch);
+ assertSnapshotSize(checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
+
+ checkpointId++;
+ RowData row = SimpleDataUtil.createRowData(checkpointId, "world" + checkpointId);
+ StructLike partition = new PartitionData(table.spec().partitionType());
+ partition.set(0, checkpointId);
+ dataFile =
+ writeDataFile("data-" + checkpointId, ImmutableList.of(row), table.spec(), partition);
+ harness.processElement(of(dataFile), ++timestamp);
+ rows.add(row);
+ harness.snapshot(checkpointId, ++timestamp);
+ assertFlinkManifests(1);
+
+ specId =
+ getStagingManifestSpecId(harness.getOperator().getOperatorStateBackend(), checkpointId);
+ assertThat(specId).isEqualTo(table.spec().specId());
+
+ harness.notifyOfCompletedCheckpoint(checkpointId);
+ assertFlinkManifests(0);
+
+ SimpleDataUtil.assertTableRows(table, rows, branch);
+ assertSnapshotSize(checkpointId);
+ assertMaxCommittedCheckpointId(jobId, operatorId, checkpointId);
+ }
+ }
+
+ private int getStagingManifestSpecId(OperatorStateStore operatorStateStore, long checkPointId)
+ throws Exception {
+ ListState<SortedMap<Long, byte[]>> checkpointsState =
+ operatorStateStore.getListState(IcebergFilesCommitter.buildStateDescriptor());
+ NavigableMap<Long, byte[]> statedDataFiles =
+ Maps.newTreeMap(checkpointsState.get().iterator().next());
+ DeltaManifests deltaManifests =
+ SimpleVersionedSerialization.readVersionAndDeSerialize(
+ DeltaManifestsSerializer.INSTANCE, statedDataFiles.get(checkPointId));
+ return deltaManifests.dataManifest().partitionSpecId();
+ }
+
private DeleteFile writeEqDeleteFile(
FileAppenderFactory<RowData> appenderFactory, String filename, List<RowData> deletes)
throws IOException {
@@ -949,6 +1066,20 @@ public class TestIcebergFilesCommitter extends TableTestBase {
rows);
}
+ private DataFile writeDataFile(
+ String filename, List<RowData> rows, PartitionSpec spec, StructLike partition)
+ throws IOException {
+ return SimpleDataUtil.writeFile(
+ table,
+ table.schema(),
+ spec,
+ CONF,
+ table.location(),
+ format.addExtension(filename),
+ rows,
+ partition);
+ }
+
private void assertMaxCommittedCheckpointId(JobID jobID, OperatorID operatorID, long expectedId) {
table.refresh();
long actualId =
@@ -964,7 +1095,7 @@ public class TestIcebergFilesCommitter extends TableTestBase {
private OneInputStreamOperatorTestHarness<WriteResult, Void> createStreamSink(JobID jobID)
throws Exception {
- TestOperatorFactory factory = TestOperatorFactory.of(table.location(), branch);
+ TestOperatorFactory factory = TestOperatorFactory.of(table.location(), branch, table.spec());
return new OneInputStreamOperatorTestHarness<>(factory, createEnvironment(jobID));
}
@@ -985,14 +1116,16 @@ public class TestIcebergFilesCommitter extends TableTestBase {
implements OneInputStreamOperatorFactory<WriteResult, Void> {
private final String tablePath;
private final String branch;
+ private final PartitionSpec spec;
- private TestOperatorFactory(String tablePath, String branch) {
+ private TestOperatorFactory(String tablePath, String branch, PartitionSpec spec) {
this.tablePath = tablePath;
this.branch = branch;
+ this.spec = spec;
}
- private static TestOperatorFactory of(String tablePath, String branch) {
- return new TestOperatorFactory(tablePath, branch);
+ private static TestOperatorFactory of(String tablePath, String branch, PartitionSpec spec) {
+ return new TestOperatorFactory(tablePath, branch, spec);
}
@Override
@@ -1005,7 +1138,8 @@ public class TestIcebergFilesCommitter extends TableTestBase {
false,
Collections.singletonMap("flink.test", TestIcebergFilesCommitter.class.getName()),
ThreadPools.WORKER_THREAD_POOL_SIZE,
- branch);
+ branch,
+ spec);
committer.setup(param.getContainingTask(), param.getStreamConfig(), param.getOutput());
return (T) committer;
}