You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/11 16:33:40 UTC
[flink] 10/10: [FLINK-18680][connectors] Make connector base
RecordsWithSplitIds more lightweight.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f42a3ebc3e81a034b7221a803c153636fef34903
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 7 00:20:38 2020 +0200
[FLINK-18680][connectors] Make connector base RecordsWithSplitIds more lightweight.
This turns the RecordsWithSplitIds structure from a holder of materialized collections to
a simple iterator-like structure to allow for lazy materialization and object reuse.
---
.../base/source/reader/RecordsBySplits.java | 179 +++++++++++++--------
.../base/source/reader/RecordsWithSplitIds.java | 20 +--
.../base/source/reader/SourceReaderBase.java | 117 +++++++++-----
.../base/source/reader/SplitsRecordIterator.java | 100 ------------
.../source/reader/fetcher/SplitFetcherTest.java | 10 +-
.../base/source/reader/mocks/MockSplitReader.java | 5 +-
.../reader/mocks/TestingRecordsWithSplitIds.java | 32 +---
7 files changed, 212 insertions(+), 251 deletions(-)
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
index 77cb594..0b15432 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
@@ -20,83 +20,59 @@ package org.apache.flink.connector.base.source.reader;
import org.apache.flink.api.connector.source.SourceSplit;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* An implementation of RecordsWithSplitIds to host all the records by splits.
*/
public class RecordsBySplits<E> implements RecordsWithSplitIds<E> {
- private Map<String, Collection<E>> recordsBySplits = new LinkedHashMap<>();
- private Set<String> finishedSplits = new HashSet<>();
- /**
- * Add the record from the given split ID.
- *
- * @param splitId the split ID the record was from.
- * @param record the record to add.
- */
- public void add(String splitId, E record) {
- recordsBySplits.computeIfAbsent(splitId, sid -> new ArrayList<>()).add(record);
- }
+ private final Set<String> finishedSplits;
- /**
- * Add the record from the given source split.
- *
- * @param split the source split the record was from.
- * @param record the record to add.
- */
- public void add(SourceSplit split, E record) {
- add(split.splitId(), record);
- }
+ private final Iterator<Map.Entry<String, Collection<E>>> splitsIterator;
- /**
- * Add multiple records from the given split ID.
- *
- * @param splitId the split ID given the records were from.
- * @param records the records to add.
- */
- public void addAll(String splitId, Collection<E> records) {
- this.recordsBySplits.compute(splitId, (id, r) -> {
- if (r == null) {
- r = records;
- } else {
- r.addAll(records);
- }
- return r;
- });
- }
+ @Nullable
+ private Iterator<E> recordsInCurrentSplit;
- /**
- * Add multiple records from the given source split.
- *
- * @param split the source split the records were from.
- * @param records the records to add.
- */
- public void addAll(SourceSplit split, Collection<E> records) {
- addAll(split.splitId(), records);
+ public RecordsBySplits(
+ final Map<String, Collection<E>> recordsBySplit,
+ final Set<String> finishedSplits) {
+
+ this.splitsIterator = checkNotNull(recordsBySplit, "recordsBySplit").entrySet().iterator();
+ this.finishedSplits = checkNotNull(finishedSplits, "finishedSplits");
}
- /**
- * Mark the split with the given ID as finished.
- *
- * @param splitId the ID of the finished split.
- */
- public void addFinishedSplit(String splitId) {
- finishedSplits.add(splitId);
+ @Nullable
+ @Override
+ public String nextSplit() {
+ if (splitsIterator.hasNext()) {
+ final Map.Entry<String, Collection<E>> next = splitsIterator.next();
+ recordsInCurrentSplit = next.getValue().iterator();
+ return next.getKey();
+ } else {
+ return null;
+ }
}
- /**
- * Mark multiple splits with the given IDs as finished.
- *
- * @param splitIds the IDs of the finished splits.
- */
- public void addFinishedSplits(Collection<String> splitIds) {
- finishedSplits.addAll(splitIds);
+ @Nullable
+ @Override
+ public E nextRecordFromSplit() {
+ if (recordsInCurrentSplit == null) {
+ throw new IllegalStateException();
+ }
+
+ return recordsInCurrentSplit.hasNext() ? recordsInCurrentSplit.next() : null;
}
@Override
@@ -104,13 +80,86 @@ public class RecordsBySplits<E> implements RecordsWithSplitIds<E> {
return finishedSplits;
}
- @Override
- public Collection<String> splitIds() {
- return recordsBySplits.keySet();
- }
+ // ------------------------------------------------------------------------
- @Override
- public Map<String, Collection<E>> recordsBySplits() {
- return recordsBySplits;
+ /**
+ * A utility builder to collect records in individual calls, rather than put a finished collection
+ * in the {@link RecordsBySplits#RecordsBySplits(Map, Set)} constructor.
+ */
+ public static class Builder<E> {
+
+ private final Map<String, Collection<E>> recordsBySplits = new LinkedHashMap<>();
+ private final Set<String> finishedSplits = new HashSet<>(2);
+
+ /**
+ * Add the record from the given split ID.
+ *
+ * @param splitId the split ID the record was from.
+ * @param record the record to add.
+ */
+ public void add(String splitId, E record) {
+ recordsBySplits.computeIfAbsent(splitId, sid -> new ArrayList<>()).add(record);
+ }
+
+ /**
+ * Add the record from the given source split.
+ *
+ * @param split the source split the record was from.
+ * @param record the record to add.
+ */
+ public void add(SourceSplit split, E record) {
+ add(split.splitId(), record);
+ }
+
+ /**
+ * Add multiple records from the given split ID.
+ *
+ * @param splitId the split ID given the records were from.
+ * @param records the records to add.
+ */
+ public void addAll(String splitId, Collection<E> records) {
+ this.recordsBySplits.compute(splitId, (id, r) -> {
+ if (r == null) {
+ r = records;
+ } else {
+ r.addAll(records);
+ }
+ return r;
+ });
+ }
+
+ /**
+ * Add multiple records from the given source split.
+ *
+ * @param split the source split the records were from.
+ * @param records the records to add.
+ */
+ public void addAll(SourceSplit split, Collection<E> records) {
+ addAll(split.splitId(), records);
+ }
+
+ /**
+ * Mark the split with the given ID as finished.
+ *
+ * @param splitId the ID of the finished split.
+ */
+ public void addFinishedSplit(String splitId) {
+ finishedSplits.add(splitId);
+ }
+
+ /**
+ * Mark multiple splits with the given IDs as finished.
+ *
+ * @param splitIds the IDs of the finished splits.
+ */
+ public void addFinishedSplits(Collection<String> splitIds) {
+ finishedSplits.addAll(splitIds);
+ }
+
+ public RecordsBySplits<E> build() {
+ return new RecordsBySplits<>(
+ recordsBySplits.isEmpty() ? Collections.emptyMap() : recordsBySplits,
+ finishedSplits.isEmpty() ? Collections.emptySet() : finishedSplits);
+ }
}
}
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
index dc915b3..0c8fd07 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
@@ -18,8 +18,8 @@
package org.apache.flink.connector.base.source.reader;
-import java.util.Collection;
-import java.util.Map;
+import javax.annotation.Nullable;
+
import java.util.Set;
/**
@@ -28,18 +28,18 @@ import java.util.Set;
public interface RecordsWithSplitIds<E> {
/**
- * Get all the split ids.
- *
- * @return a collection of split ids.
+ * Moves to the next split. This method is also called initially to move to the
+ * first split. Returns null, if no splits are left.
*/
- Collection<String> splitIds();
+ @Nullable
+ String nextSplit();
/**
- * Get all the records by Splits.
- *
- * @return a mapping from split ids to the records.
+ * Gets the next record from the current split. Returns null if no more records are left
+ * in this split.
*/
- Map<String, Collection<E>> recordsBySplits();
+ @Nullable
+ E nextRecordFromSplit();
/**
* Get the finished splits.
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 4a41d49..02b7a7c 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -35,14 +35,19 @@ import org.apache.flink.core.io.InputStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
+import static org.apache.flink.util.Preconditions.checkState;
+
/**
* An abstract implementation of {@link SourceReader} which provides some sychronization between
* the mail box main thread and the SourceReader internal threads. This class allows user to
@@ -81,8 +86,10 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
/** The context of this source reader. */
protected SourceReaderContext context;
- /** The last element to ensure it is fully handled. */
- private SplitsRecordIterator<E> splitIter;
+ /** The latest fetched batch of records-by-split from the split reader. */
+ @Nullable private RecordsWithSplitIds<E> currentFetch;
+ @Nullable private SplitContext<T, SplitStateT> currentSplitContext;
+ @Nullable private SourceOutput<T> currentSplitOutput;
/** Indicating whether the SourceReader will be assigned more splits or not.*/
private boolean noMoreSplitsAssignment;
@@ -99,7 +106,6 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
this.splitFetcherManager = splitFetcherManager;
this.recordEmitter = recordEmitter;
this.splitStates = new HashMap<>();
- this.splitIter = null;
this.options = new SourceReaderOptions(config);
this.config = config;
this.context = context;
@@ -107,60 +113,87 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
}
@Override
- public void start() {
-
- }
+ public void start() {}
@Override
public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
splitFetcherManager.checkErrors();
- // poll from the queue if the last element was successfully handled. Otherwise
- // just pass the last element again.
- RecordsWithSplitIds<E> recordsWithSplitId = null;
- boolean newFetch = splitIter == null || !splitIter.hasNext();
- if (newFetch) {
- recordsWithSplitId = elementsQueue.poll();
+
+ // make sure we have a fetch we are working on, or move to the next
+ final RecordsWithSplitIds<E> recordsWithSplitId = getCurrentOrNewFetch(output);
+ if (recordsWithSplitId == null) {
+ return trace(finishedOrAvailableLater());
}
- InputStatus status;
- if (newFetch && recordsWithSplitId == null) {
- // No element available, set to available later if needed.
- status = finishedOrAvailableLater();
- } else {
- // Update the record iterator if it is a new fetch.
- if (newFetch) {
- splitIter = new SplitsRecordIterator<>(recordsWithSplitId);
- }
+ // we need to loop here, because we may have to go across splits
+ while (true) {
// Process one record.
- if (splitIter.hasNext()) {
+ final E record = recordsWithSplitId.nextRecordFromSplit();
+ if (record != null) {
// emit the record.
- final E record = splitIter.next();
- final SplitContext<T, SplitStateT> splitContext = splitStates.get(splitIter.currentSplitId());
- final SourceOutput<T> splitOutput = splitContext.getOrCreateSplitOutput(output);
- recordEmitter.emitRecord(record, splitOutput, splitContext.state);
+ recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state);
LOG.trace("Emitted record: {}", record);
+ return trace(InputStatus.MORE_AVAILABLE);
}
- // Do some cleanup if the all the records in the current splitIter have been processed.
- if (!splitIter.hasNext()) {
- // First remove the state of the split.
- splitIter.finishedSplitIds().forEach((id) -> {
- splitStates.remove(id);
- output.releaseOutputForSplit(id);
- });
- // Handle the finished splits.
- onSplitFinished(splitIter.finishedSplitIds());
- splitIter.dispose();
- // Prepare the return status based on the availability of the next element.
- status = finishedOrAvailableLater();
- } else {
- // There are more records from the current splitIter.
- status = InputStatus.MORE_AVAILABLE;
+ else if (!moveToNextSplit(recordsWithSplitId, output)) {
+ return trace(finishedOrAvailableLater());
}
+ // else fall through the loop
}
+ }
+
+ private InputStatus trace(InputStatus status) {
LOG.trace("Source reader status: {}", status);
return status;
}
+ @Nullable
+ private RecordsWithSplitIds<E> getCurrentOrNewFetch(final ReaderOutput<T> output) {
+ RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch;
+ if (recordsWithSplitId != null) {
+ return recordsWithSplitId;
+ }
+
+ recordsWithSplitId = elementsQueue.poll();
+ if (recordsWithSplitId == null || !moveToNextSplit(recordsWithSplitId, output)) {
+ // No element available, set to available later if needed.
+ return null;
+ }
+
+ currentFetch = recordsWithSplitId;
+ return recordsWithSplitId;
+ }
+
+ private void finishCurrentFetch(final RecordsWithSplitIds<E> fetch, final ReaderOutput<T> output) {
+ currentFetch = null;
+ currentSplitContext = null;
+ currentSplitOutput = null;
+
+ final Set<String> finishedSplits = fetch.finishedSplits();
+ if (!finishedSplits.isEmpty()) {
+ for (String finishedSplitId : finishedSplits) {
+ splitStates.remove(finishedSplitId);
+ output.releaseOutputForSplit(finishedSplitId);
+ }
+ onSplitFinished(finishedSplits);
+ }
+
+ fetch.recycle();
+ }
+
+ private boolean moveToNextSplit(RecordsWithSplitIds<E> recordsWithSplitIds, ReaderOutput<T> output) {
+ final String nextSplitId = recordsWithSplitIds.nextSplit();
+ if (nextSplitId == null) {
+ finishCurrentFetch(recordsWithSplitIds, output);
+ return false;
+ }
+
+ currentSplitContext = splitStates.get(nextSplitId);
+ checkState(currentSplitContext != null, "Have records for a split that was not registered");
+ currentSplitOutput = currentSplitContext.getOrCreateSplitOutput(output);
+ return true;
+ }
+
@Override
public CompletableFuture<Void> isAvailable() {
// The order matters here. We first get the future. After this point, if the queue
@@ -235,7 +268,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
private InputStatus finishedOrAvailableLater() {
boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers();
- boolean allElementsEmitted = elementsQueue.isEmpty() && (splitIter == null || !splitIter.hasNext());
+ boolean allElementsEmitted = elementsQueue.isEmpty();
if (noMoreSplitsAssignment && allFetchersHaveShutdown && allElementsEmitted) {
return InputStatus.END_OF_INPUT;
} else {
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
deleted file mode 100644
index c83bec0..0000000
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.base.source.reader;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
-/**
- * A class that wraps around a {@link RecordsWithSplitIds} and provide a consistent iterator.
- */
-public class SplitsRecordIterator<E> {
-
- private final RecordsWithSplitIds<E> recordsWithSplitIds;
- private final Iterator<Map.Entry<String, Collection<E>>> splitIter;
- private String currentSplitId;
- private Iterator<E> recordsIter;
-
- /**
- * Construct a cross-splits iterator for the records.
- *
- * @param recordsWithSplitIds the records by splits.
- */
- public SplitsRecordIterator(RecordsWithSplitIds<E> recordsWithSplitIds) {
- this.recordsWithSplitIds = recordsWithSplitIds;
- Map<String, Collection<E>> recordsBySplits = recordsWithSplitIds.recordsBySplits();
- // Remove empty splits;
- recordsBySplits.entrySet().removeIf(e -> e.getValue().isEmpty());
- this.splitIter = recordsBySplits.entrySet().iterator();
- }
-
- /**
- * Whether their are more records available.
- *
- * @return true if there are more records, false otherwise.
- */
- public boolean hasNext() {
- if (recordsIter == null || !recordsIter.hasNext()) {
- if (splitIter.hasNext()) {
- Map.Entry<String, Collection<E>> entry = splitIter.next();
- currentSplitId = entry.getKey();
- recordsIter = entry.getValue().iterator();
- } else {
- return false;
- }
- }
- return recordsIter.hasNext() || splitIter.hasNext();
- }
-
- /**
- * Get the next record.
- * @return the next record.
- */
- public E next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- return recordsIter.next();
- }
-
- /**
- * Get the split id of the last returned record.
- *
- * @return the split id of the last returned record.
- */
- public String currentSplitId() {
- return currentSplitId;
- }
-
- /**
- * The split Ids that are finished after all the records in this iterator are emitted.
- *
- * @return a set of finished split Ids.
- */
- public Set<String> finishedSplitIds() {
- return recordsWithSplitIds.finishedSplits();
- }
-
- public void dispose() {
- recordsWithSplitIds.recycle();
- }
-}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index 953d7aa..e9c2ad2 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -95,9 +95,13 @@ public class SplitFetcherTest {
interrupter.start();
while (recordsRead.size() < NUM_SPLITS * NUM_RECORDS_PER_SPLIT) {
- elementQueue.take().recordsBySplits().values().forEach(records ->
- // Ensure there is no duplicate records.
- records.forEach(arr -> assertTrue(recordsRead.add(arr[0]))));
+ final RecordsWithSplitIds<int[]> nextBatch = elementQueue.take();
+ while (nextBatch.nextSplit() != null) {
+ int[] arr;
+ while ((arr = nextBatch.nextRecordFromSplit()) != null) {
+ assertTrue(recordsRead.add(arr[0]));
+ }
+ }
}
assertEquals(NUM_TOTAL_RECORDS, recordsRead.size());
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
index 3c6d8df..00d4d71 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
@@ -81,7 +81,8 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
}
private RecordsBySplits<int[]> getRecords() {
- RecordsBySplits<int[]> records = new RecordsBySplits<>();
+ final RecordsBySplits.Builder<int[]> records = new RecordsBySplits.Builder<>();
+
try {
for (Map.Entry<String, MockSourceSplit> entry : splits.entrySet()) {
MockSourceSplit split = entry.getValue();
@@ -102,6 +103,6 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
throw new RuntimeException("Caught unexpected interrupted exception.");
}
}
- return records;
+ return records.build();
}
}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java
index 2a8377a..3aa49ac 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java
@@ -18,48 +18,22 @@
package org.apache.flink.connector.base.source.reader.mocks;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A mock implementation of {@link RecordsWithSplitIds} that returns a given set of records.
*/
-public class TestingRecordsWithSplitIds<E> implements RecordsWithSplitIds<E> {
-
- private final Map<String, Collection<E>> records;
-
- private final String splitId;
+public class TestingRecordsWithSplitIds<E> extends RecordsBySplits<E> {
private volatile boolean isRecycled;
@SafeVarargs
public TestingRecordsWithSplitIds(String splitId, E... records) {
- this.splitId = checkNotNull(splitId);
- this.records = new HashMap<>();
- this.records.put(splitId, Arrays.asList(records));
- }
-
- @Override
- public Collection<String> splitIds() {
- return Collections.singleton(splitId);
- }
-
- @Override
- public Map<String, Collection<E>> recordsBySplits() {
- return records;
- }
-
- @Override
- public Set<String> finishedSplits() {
- return Collections.emptySet();
+ super(Collections.singletonMap(splitId, Arrays.asList(records)), Collections.emptySet());
}
@Override