You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by tzulitai <gi...@git.apache.org> on 2017/07/28 12:56:13 UTC

[GitHub] flink pull request #3669: [FLINK-6215] Make the StatefulSequenceSource scala...

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3669#discussion_r130080485
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java ---
    @@ -61,52 +65,89 @@
     	 * @param end End of the range of numbers to emit.
     	 */
     	public StatefulSequenceSource(long start, long end) {
    +		Preconditions.checkArgument(start <= end);
     		this.start = start;
     		this.end = end;
     	}
     
     	@Override
     	public void initializeState(FunctionInitializationContext context) throws Exception {
     
    -		Preconditions.checkState(this.checkpointedState == null,
    +		Preconditions.checkState(checkpointedState == null,
     			"The " + getClass().getSimpleName() + " has already been initialized.");
     
     		this.checkpointedState = context.getOperatorStateStore().getOperatorState(
     			new ListStateDescriptor<>(
    -				"stateful-sequence-source-state",
    -				LongSerializer.INSTANCE
    +				"stateful-sequence-source-state", 
    +					new TupleSerializer<>(
    +							(Class<Tuple2<Long, Long>>) (Class<?>) Tuple2.class,
    +							new TypeSerializer<?>[] { LongSerializer.INSTANCE, LongSerializer.INSTANCE }
    +					)
     			)
     		);
     
    -		this.valuesToEmit = new ArrayDeque<>();
    +		this.endToNextOffsetMapping = new HashMap<>();
     		if (context.isRestored()) {
    -			// upon restoring
    -
    -			for (Long v : this.checkpointedState.get()) {
    -				this.valuesToEmit.add(v);
    +			for (Tuple2<Long, Long> partitionInfo: checkpointedState.get()) {
    +				Long prev = endToNextOffsetMapping.put(partitionInfo.f0, partitionInfo.f1);
    +				Preconditions.checkState(prev == null,
    +						getClass().getSimpleName() + " : Duplicate entry when restoring.");
     			}
     		} else {
    -			// the first time the job is executed
    -
    -			final int stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
     			final int taskIdx = getRuntimeContext().getIndexOfThisSubtask();
    -			final long congruence = start + taskIdx;
    +			final int parallelTasks = getRuntimeContext().getNumberOfParallelSubtasks();
    +
    +			final long totalElements = Math.abs(end - start + 1L);
    +			final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
    +			final int totalPartitions = totalElements < Integer.MAX_VALUE ? Math.min(maxParallelism, (int) totalElements) : maxParallelism;
     
    -			long totalNoOfElements = Math.abs(end - start + 1);
    -			final int baseSize = safeDivide(totalNoOfElements, stepSize);
    -			final int toCollect = (totalNoOfElements % stepSize > taskIdx) ? baseSize + 1 : baseSize;
    +			Tuple2<Integer, Integer> localPartitionRange = getLocalRange(totalPartitions, parallelTasks, taskIdx);
    +			int localStartIdx = localPartitionRange.f0;
    +			int localEndIdx = localStartIdx + localPartitionRange.f1;
     
    -			for (long collected = 0; collected < toCollect; collected++) {
    -				this.valuesToEmit.add(collected * stepSize + congruence);
    +			for (int partIdx = localStartIdx; partIdx < localEndIdx; partIdx++) {
    +				Tuple2<Long, Long> limits = getPartitionLimits(totalElements, totalPartitions, partIdx);
    +				endToNextOffsetMapping.put(limits.f1, limits.f0);
     			}
     		}
     	}
     
    +	private Tuple2<Integer, Integer> getLocalRange(int totalPartitions, int parallelTasks, int taskIdx) {
    +		int minPartitionSliceSize = totalPartitions / parallelTasks;
    +		int remainingPartitions = totalPartitions - minPartitionSliceSize * parallelTasks;
    +
    +		int localRangeStartIdx = taskIdx * minPartitionSliceSize + Math.min(taskIdx, remainingPartitions);
    +		int localRangeSize = taskIdx < remainingPartitions ? minPartitionSliceSize + 1 : minPartitionSliceSize;
    +
    +		return new Tuple2<>(localRangeStartIdx, localRangeSize);
    +	}
    +
    +	private Tuple2<Long, Long> getPartitionLimits(long totalElements, int totalPartitions, long partitionIdx) {
    +		long minElementPartitionSize = totalElements / totalPartitions;
    +		long remainingElements = totalElements - minElementPartitionSize * totalPartitions;
    +		long startOffset = start;
    +
    +		for (int idx = 0; idx < partitionIdx; idx++) {
    +			long partitionSize = idx < remainingElements ? minElementPartitionSize + 1L : minElementPartitionSize;
    +			startOffset += partitionSize;
    +		}
    +
    +		long partitionSize = partitionIdx < remainingElements ? minElementPartitionSize + 1L : minElementPartitionSize;
    +		return new Tuple2<>(startOffset, startOffset + partitionSize);
    +	}
    +
     	@Override
     	public void run(SourceContext<Long> ctx) throws Exception {
    -		while (isRunning && !this.valuesToEmit.isEmpty()) {
    -			synchronized (ctx.getCheckpointLock()) {
    -				ctx.collect(this.valuesToEmit.poll());
    +		for (Map.Entry<Long, Long> partition: endToNextOffsetMapping.entrySet()) {
    --- End diff --
    
    nit: empty spaces should surround ":"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---