You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2017/04/04 09:47:13 UTC

[GitHub] flink pull request #3669: [FLINK-6215] Make the StatefulSequenceSource scala...

GitHub user kl0u opened a pull request:

    https://github.com/apache/flink/pull/3669

    [FLINK-6215] Make the StatefulSequenceSource scalable.

    So far this source was computing all the elements to
    be emitted and stored them in memory. This could lead
    to out-of-memory problems for large deployments. Now
    we split the range of elements into partitions that
    can be re-shuffled upon rescaling and we just store
    the next offset and the end of each one of them upon
    checkpointing.
    
    The current version of the PR has no backwards compatibility,
    as this becomes tricky given that we change the semantics
    of the state that we store.
    
    I believe that this is ok, given that it is a fix that has to go in
    the 1.3 and we are not sure if people are actually using it in 
    production, i.e. in settings that need backwards compatibility.
    
    What do you think @aljoscha @StephanEwen ?
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kl0u/flink stateful-src

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3669.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3669
    
----
commit cf333b0b0c318569a1704ca71121c37dcd12bd3d
Author: kl0u <kk...@gmail.com>
Date:   2017-03-29T16:21:02Z

    [FLINK-6215] Make the StatefulSequenceSource scalable.
    
    So far this source was computing all the elements to
    be emitted and stored them in memory. This could lead
    to out-of-memory problems for large deployments. Now
    we do split the range of elements into partitions that
    can be re-shuffled upon rescaling and we just store
    the next offset and the end of each one of them upon
    checkpointing.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3669: [FLINK-6215] Make the StatefulSequenceSource scalable.

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the issue:

    https://github.com/apache/flink/pull/3669
  
    @kl0u how important is this for 1.3?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3669: [FLINK-6215] Make the StatefulSequenceSource scalable.

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/3669
  
    I think this would be crucial. It does not even work for any meaningful tests in the way it is in the code right now.
    
    +1 for putting this into 1.3 as a bugfix


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3669: [FLINK-6215] Make the StatefulSequenceSource scala...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3669#discussion_r130080485
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java ---
    @@ -61,52 +65,89 @@
     	 * @param end End of the range of numbers to emit.
     	 */
     	public StatefulSequenceSource(long start, long end) {
    +		Preconditions.checkArgument(start <= end);
     		this.start = start;
     		this.end = end;
     	}
     
     	@Override
     	public void initializeState(FunctionInitializationContext context) throws Exception {
     
    -		Preconditions.checkState(this.checkpointedState == null,
    +		Preconditions.checkState(checkpointedState == null,
     			"The " + getClass().getSimpleName() + " has already been initialized.");
     
     		this.checkpointedState = context.getOperatorStateStore().getOperatorState(
     			new ListStateDescriptor<>(
    -				"stateful-sequence-source-state",
    -				LongSerializer.INSTANCE
    +				"stateful-sequence-source-state", 
    +					new TupleSerializer<>(
    +							(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
    +							new TypeSerializer<?>[] { LongSerializer.INSTANCE, LongSerializer.INSTANCE }
    +					)
     			)
     		);
     
    -		this.valuesToEmit = new ArrayDeque<>();
    +		this.endToNextOffsetMapping = new HashMap<>();
     		if (context.isRestored()) {
    -			// upon restoring
    -
    -			for (Long v : this.checkpointedState.get()) {
    -				this.valuesToEmit.add(v);
    +			for (Tuple2<Long, Long> partitionInfo: checkpointedState.get()) {
    +				Long prev = endToNextOffsetMapping.put(partitionInfo.f0, partitionInfo.f1);
    +				Preconditions.checkState(prev == null,
    +						getClass().getSimpleName() + " : Duplicate entry when restoring.");
     			}
     		} else {
    -			// the first time the job is executed
    -
    -			final int stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
     			final int taskIdx = getRuntimeContext().getIndexOfThisSubtask();
    -			final long congruence = start + taskIdx;
    +			final int parallelTasks = getRuntimeContext().getNumberOfParallelSubtasks();
    +
    +			final long totalElements = Math.abs(end - start + 1L);
    +			final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
    +			final int totalPartitions = totalElements < Integer.MAX_VALUE ? Math.min(maxParallelism, (int) totalElements) : maxParallelism;
     
    -			long totalNoOfElements = Math.abs(end - start + 1);
    -			final int baseSize = safeDivide(totalNoOfElements, stepSize);
    -			final int toCollect = (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : baseSize;
    +			Tuple2<Integer, Integer> localPartitionRange = getLocalRange(totalPartitions, parallelTasks, taskIdx);
    +			int localStartIdx = localPartitionRange.f0;
    +			int localEndIdx = localStartIdx + localPartitionRange.f1;
     
    -			for (long collected = 0; collected < toCollect; collected++) {
    -				this.valuesToEmit.add(collected * stepSize + congruence);
    +			for (int partIdx = localStartIdx; partIdx < localEndIdx; partIdx++) {
    +				Tuple2<Long, Long> limits = getPartitionLimits(totalElements, totalPartitions, partIdx);
    +				endToNextOffsetMapping.put(limits.f1, limits.f0);
     			}
     		}
     	}
     
    +	private Tuple2<Integer, Integer> getLocalRange(int totalPartitions, int parallelTasks, int taskIdx) {
    +		int minPartitionSliceSize = totalPartitions / parallelTasks;
    +		int remainingPartitions = totalPartitions - minPartitionSliceSize * parallelTasks;
    +
    +		int localRangeStartIdx = taskIdx * minPartitionSliceSize + Math.min(taskIdx, remainingPartitions);
    +		int localRangeSize = taskIdx < remainingPartitions ? minPartitionSliceSize + 1 : minPartitionSliceSize;
    +
    +		return new Tuple2<>(localRangeStartIdx, localRangeSize);
    +	}
    +
    +	private Tuple2<Long, Long> getPartitionLimits(long totalElements, int totalPartitions, long partitionIdx) {
    +		long minElementPartitionSize = totalElements / totalPartitions;
    +		long remainingElements = totalElements - minElementPartitionSize * totalPartitions;
    +		long startOffset = start;
    +
    +		for (int idx = 0; idx < partitionIdx; idx++) {
    +			long partitionSize = idx < remainingElements ? minElementPartitionSize + 1L : minElementPartitionSize;
    +			startOffset += partitionSize;
    +		}
    +
    +		long partitionSize = partitionIdx < remainingElements ? minElementPartitionSize + 1L : minElementPartitionSize;
    +		return new Tuple2<>(startOffset, startOffset + partitionSize);
    +	}
    +
     	@Override
     	public void run(SourceContext<Long> ctx) throws Exception {
    -		while (isRunning && !this.valuesToEmit.isEmpty()) {
    -			synchronized (ctx.getCheckpointLock()) {
    -				ctx.collect(this.valuesToEmit.poll());
    +		for (Map.Entry<Long, Long> partition: endToNextOffsetMapping.entrySet()) {
    --- End diff --
    
    nit: empty spaces should surround ":"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3669: [FLINK-6215] Make the StatefulSequenceSource scala...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3669#discussion_r130081125
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java ---
    @@ -61,52 +65,89 @@
     	 * @param end End of the range of numbers to emit.
     	 */
     	public StatefulSequenceSource(long start, long end) {
    +		Preconditions.checkArgument(start <= end);
     		this.start = start;
     		this.end = end;
     	}
     
     	@Override
     	public void initializeState(FunctionInitializationContext context) throws Exception {
     
    -		Preconditions.checkState(this.checkpointedState == null,
    +		Preconditions.checkState(checkpointedState == null,
     			"The " + getClass().getSimpleName() + " has already been initialized.");
     
     		this.checkpointedState = context.getOperatorStateStore().getOperatorState(
     			new ListStateDescriptor<>(
    -				"stateful-sequence-source-state",
    -				LongSerializer.INSTANCE
    +				"stateful-sequence-source-state", 
    +					new TupleSerializer<>(
    +							(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
    +							new TypeSerializer<?>[] { LongSerializer.INSTANCE, LongSerializer.INSTANCE }
    +					)
     			)
     		);
     
    -		this.valuesToEmit = new ArrayDeque<>();
    +		this.endToNextOffsetMapping = new HashMap<>();
     		if (context.isRestored()) {
    -			// upon restoring
    -
    -			for (Long v : this.checkpointedState.get()) {
    -				this.valuesToEmit.add(v);
    +			for (Tuple2<Long, Long> partitionInfo: checkpointedState.get()) {
    --- End diff --
    
    nit: empty spaces should surround ":"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3669: [FLINK-6215] Make the StatefulSequenceSource scalable.

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/3669
  
    @greghogan Not sure. In my understanding that was made for testing (this is why the old implementation was not efficient) but there may be also some more serious users. Any comments @aljoscha ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3669: [FLINK-6215] Make the StatefulSequenceSource scala...

Posted by tzulitai <gi...@git.apache.org>.
Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3669#discussion_r130081876
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java ---
    @@ -61,52 +65,89 @@
     	 * @param end End of the range of numbers to emit.
     	 */
     	public StatefulSequenceSource(long start, long end) {
    +		Preconditions.checkArgument(start <= end);
     		this.start = start;
     		this.end = end;
     	}
     
     	@Override
     	public void initializeState(FunctionInitializationContext context) throws Exception {
     
    -		Preconditions.checkState(this.checkpointedState == null,
    +		Preconditions.checkState(checkpointedState == null,
     			"The " + getClass().getSimpleName() + " has already been initialized.");
     
     		this.checkpointedState = context.getOperatorStateStore().getOperatorState(
     			new ListStateDescriptor<>(
    -				"stateful-sequence-source-state",
    -				LongSerializer.INSTANCE
    +				"stateful-sequence-source-state", 
    +					new TupleSerializer<>(
    +							(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
    +							new TypeSerializer<?>[] { LongSerializer.INSTANCE, LongSerializer.INSTANCE }
    +					)
     			)
     		);
     
    -		this.valuesToEmit = new ArrayDeque<>();
    +		this.endToNextOffsetMapping = new HashMap<>();
     		if (context.isRestored()) {
    -			// upon restoring
    -
    -			for (Long v : this.checkpointedState.get()) {
    -				this.valuesToEmit.add(v);
    +			for (Tuple2<Long, Long> partitionInfo: checkpointedState.get()) {
    +				Long prev = endToNextOffsetMapping.put(partitionInfo.f0, partitionInfo.f1);
    +				Preconditions.checkState(prev == null,
    +						getClass().getSimpleName() + " : Duplicate entry when restoring.");
     			}
     		} else {
    -			// the first time the job is executed
    -
    -			final int stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
     			final int taskIdx = getRuntimeContext().getIndexOfThisSubtask();
    -			final long congruence = start + taskIdx;
    +			final int parallelTasks = getRuntimeContext().getNumberOfParallelSubtasks();
    +
    +			final long totalElements = Math.abs(end - start + 1L);
    +			final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
    +			final int totalPartitions = totalElements < Integer.MAX_VALUE ? Math.min(maxParallelism, (int) totalElements) : maxParallelism;
     
    -			long totalNoOfElements = Math.abs(end - start + 1);
    -			final int baseSize = safeDivide(totalNoOfElements, stepSize);
    -			final int toCollect = (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : baseSize;
    +			Tuple2<Integer, Integer> localPartitionRange = getLocalRange(totalPartitions, parallelTasks, taskIdx);
    +			int localStartIdx = localPartitionRange.f0;
    +			int localEndIdx = localStartIdx + localPartitionRange.f1;
     
    -			for (long collected = 0; collected < toCollect; collected++) {
    -				this.valuesToEmit.add(collected * stepSize + congruence);
    +			for (int partIdx = localStartIdx; partIdx < localEndIdx; partIdx++) {
    +				Tuple2<Long, Long> limits = getPartitionLimits(totalElements, totalPartitions, partIdx);
    +				endToNextOffsetMapping.put(limits.f1, limits.f0);
     			}
     		}
     	}
     
    +	private Tuple2<Integer, Integer> getLocalRange(int totalPartitions, int parallelTasks, int taskIdx) {
    +		int minPartitionSliceSize = totalPartitions / parallelTasks;
    +		int remainingPartitions = totalPartitions - minPartitionSliceSize * parallelTasks;
    +
    +		int localRangeStartIdx = taskIdx * minPartitionSliceSize + Math.min(taskIdx, remainingPartitions);
    +		int localRangeSize = taskIdx < remainingPartitions ? minPartitionSliceSize + 1 : minPartitionSliceSize;
    +
    +		return new Tuple2<>(localRangeStartIdx, localRangeSize);
    +	}
    +
    +	private Tuple2<Long, Long> getPartitionLimits(long totalElements, int totalPartitions, long partitionIdx) {
    +		long minElementPartitionSize = totalElements / totalPartitions;
    +		long remainingElements = totalElements - minElementPartitionSize * totalPartitions;
    +		long startOffset = start;
    +
    +		for (int idx = 0; idx < partitionIdx; idx++) {
    +			long partitionSize = idx < remainingElements ? minElementPartitionSize + 1L : minElementPartitionSize;
    +			startOffset += partitionSize;
    +		}
    +
    +		long partitionSize = partitionIdx < remainingElements ? minElementPartitionSize + 1L : minElementPartitionSize;
    +		return new Tuple2<>(startOffset, startOffset + partitionSize);
    +	}
    +
     	@Override
     	public void run(SourceContext<Long> ctx) throws Exception {
    -		while (isRunning && !this.valuesToEmit.isEmpty()) {
    -			synchronized (ctx.getCheckpointLock()) {
    -				ctx.collect(this.valuesToEmit.poll());
    +		for (Map.Entry<Long, Long> partition: endToNextOffsetMapping.entrySet()) {
    --- End diff --
    
    I wonder if it would make sense to emit local ranges by order of increasing "end offsets". That way at least the emitted values are still always increasing.
    While we can't really guarantee ordering with this new rescalable implementation, we could still do a best effort on that locally. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---