You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/07/28 12:57:00 UTC
[jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource
scalable.
[ https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16104906#comment-16104906 ]
ASF GitHub Bot commented on FLINK-6215:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3669#discussion_r130080485
--- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java ---
@@ -61,52 +65,89 @@
* @param end End of the range of numbers to emit.
*/
public StatefulSequenceSource(long start, long end) {
+ Preconditions.checkArgument(start <= end);
this.start = start;
this.end = end;
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
- Preconditions.checkState(this.checkpointedState == null,
+ Preconditions.checkState(checkpointedState == null,
"The " + getClass().getSimpleName() + " has already been initialized.");
this.checkpointedState = context.getOperatorStateStore().getOperatorState(
new ListStateDescriptor<>(
- "stateful-sequence-source-state",
- LongSerializer.INSTANCE
+ "stateful-sequence-source-state",
+ new TupleSerializer<>(
+ (Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
+ new TypeSerializer<?>[] { LongSerializer.INSTANCE, LongSerializer.INSTANCE }
+ )
)
);
- this.valuesToEmit = new ArrayDeque<>();
+ this.endToNextOffsetMapping = new HashMap<>();
if (context.isRestored()) {
- // upon restoring
-
- for (Long v : this.checkpointedState.get()) {
- this.valuesToEmit.add(v);
+ for (Tuple2<Long, Long> partitionInfo: checkpointedState.get()) {
+ Long prev = endToNextOffsetMapping.put(partitionInfo.f0, partitionInfo.f1);
+ Preconditions.checkState(prev == null,
+ getClass().getSimpleName() + " : Duplicate entry when restoring.");
}
} else {
- // the first time the job is executed
-
- final int stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
final int taskIdx = getRuntimeContext().getIndexOfThisSubtask();
- final long congruence = start + taskIdx;
+ final int parallelTasks = getRuntimeContext().getNumberOfParallelSubtasks();
+
+ final long totalElements = Math.abs(end - start + 1L);
+ final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
+ final int totalPartitions = totalElements < Integer.MAX_VALUE ? Math.min(maxParallelism, (int) totalElements) : maxParallelism;
- long totalNoOfElements = Math.abs(end - start + 1);
- final int baseSize = safeDivide(totalNoOfElements, stepSize);
- final int toCollect = (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : baseSize;
+ Tuple2<Integer, Integer> localPartitionRange = getLocalRange(totalPartitions, parallelTasks, taskIdx);
+ int localStartIdx = localPartitionRange.f0;
+ int localEndIdx = localStartIdx + localPartitionRange.f1;
- for (long collected = 0; collected < toCollect; collected++) {
- this.valuesToEmit.add(collected * stepSize + congruence);
+ for (int partIdx = localStartIdx; partIdx < localEndIdx; partIdx++) {
+ Tuple2<Long, Long> limits = getPartitionLimits(totalElements, totalPartitions, partIdx);
+ endToNextOffsetMapping.put(limits.f1, limits.f0);
}
}
}
+ private Tuple2<Integer, Integer> getLocalRange(int totalPartitions, int parallelTasks, int taskIdx) {
+ int minPartitionSliceSize = totalPartitions / parallelTasks;
+ int remainingPartitions = totalPartitions - minPartitionSliceSize * parallelTasks;
+
+ int localRangeStartIdx = taskIdx * minPartitionSliceSize + Math.min(taskIdx, remainingPartitions);
+ int localRangeSize = taskIdx < remainingPartitions ? minPartitionSliceSize + 1 : minPartitionSliceSize;
+
+ return new Tuple2<>(localRangeStartIdx, localRangeSize);
+ }
+
+ private Tuple2<Long, Long> getPartitionLimits(long totalElements, int totalPartitions, long partitionIdx) {
+ long minElementPartitionSize = totalElements / totalPartitions;
+ long remainingElements = totalElements - minElementPartitionSize * totalPartitions;
+ long startOffset = start;
+
+ for (int idx = 0; idx < partitionIdx; idx++) {
+ long partitionSize = idx < remainingElements ? minElementPartitionSize + 1L : minElementPartitionSize;
+ startOffset += partitionSize;
+ }
+
+ long partitionSize = partitionIdx < remainingElements ? minElementPartitionSize + 1L : minElementPartitionSize;
+ return new Tuple2<>(startOffset, startOffset + partitionSize);
+ }
+
@Override
public void run(SourceContext<Long> ctx) throws Exception {
- while (isRunning && !this.valuesToEmit.isEmpty()) {
- synchronized (ctx.getCheckpointLock()) {
- ctx.collect(this.valuesToEmit.poll());
+ for (Map.Entry<Long, Long> partition: endToNextOffsetMapping.entrySet()) {
--- End diff --
nit: empty spaces should surround ":"
> Make the StatefulSequenceSource scalable.
> -----------------------------------------
>
> Key: FLINK-6215
> URL: https://issues.apache.org/jira/browse/FLINK-6215
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.3.0
> Reporter: Kostas Kloudas
> Assignee: Kostas Kloudas
> Fix For: 1.4.0
>
>
> Currently the {{StatefulSequenceSource}} instantiates all the elements to emit first and keeps them in memory. This is not scalable as for large sequences of elements this can lead to out of memory exceptions.
> To solve this, we can pre-partition the sequence of elements based on the {{maxParallelism}} parameter, and just keep state (to checkpoint) per such partition.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)