You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by makeyang <gi...@git.apache.org> on 2018/04/25 09:51:36 UTC

[GitHub] flink pull request #5908: [FLINK-9182]async checkpoints for timer service

GitHub user makeyang opened a pull request:

    https://github.com/apache/flink/pull/5908

    [FLINK-9182]async checkpoints for timer service

    ## What is the purpose of the change
    This PR is WIP, and is need finish unit tests which are marked as TODO.
    It is opened to collect feedback for a proposed solution for FLINK-9182
    
    ## Brief change log
    1. add one more state called rawkeyedstatemeta, which is help to store verion info of timerservice and tierm size
    2. make timer state snapshot async
    
    ## Does this pull request potentially affect one of the following parts:
        Dependencies (does it add or upgrade a dependency): (yes / (**no**)
        The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / **no**)
        The serializers: (yes / **no** / don't know)
        The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
        Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
        The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
        Does this pull request introduce a new feature? (yes / **no**)
        If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
    
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/makeyang/flink FLINK-9182

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5908.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5908
    
----
commit 7eca3bebd2b92ffb53a2058d10df966ffd3d4875
Author: makeyang <ma...@...>
Date:   2018-04-25T09:24:26Z

    [FLINK-9182]async checkpoints for timer service
    there are some unit tests still need to fix which I marked as TODO.
    the package has passed the integration test on my test env so please take a look at the code to verified the init thoughts first

----


---

[GitHub] flink pull request #5908: [FLINK-9182]async checkpoints for timer service

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5908#discussion_r185749819
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java ---
    @@ -222,29 +228,53 @@ public void registerProcessingTimeTimer(N namespace, long time) {
     
     	@Override
     	public void registerEventTimeTimer(N namespace, long time) {
    -		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    -		Set<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer);
    -		if (timerSet.add(timer)) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace,
    +			this.knInternalTimeServiceManager.getStateTableVersion().intValue(), -1);
    +		Map<String, InternalTimer<K, N>> timerMap = getEventTimeTimerSetForTimer((K) keyContext.getCurrentKey());
    +		InternalTimer<K, N> prev = timerMap.put(timer.buildHashKey(), timer);
    +		if (prev == null) {
    --- End diff --
    
    What happens if we find a `prev != null` that was marked as deleted? Looks like no timer will be inserted even though it should.


---

[GitHub] flink pull request #5908: [FLINK-9182]async checkpoints for timer service

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5908#discussion_r185729732
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java ---
    @@ -39,11 +40,18 @@
     	private final long timestamp;
     	private final K key;
     	private final N namespace;
    +	private final String hashKey;
    --- End diff --
    
    Why do we need this string key and not just use the normal hash code? If the answer is because you want to use it in `ConcurrentHashMap`, there are other ways to accomplish this.


---

[GitHub] flink pull request #5908: [FLINK-9182]async checkpoints for timer service

Posted by makeyang <gi...@git.apache.org>.
Github user makeyang closed the pull request at:

    https://github.com/apache/flink/pull/5908


---

[GitHub] flink pull request #5908: [FLINK-9182]async checkpoints for timer service

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5908#discussion_r185730033
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java ---
    @@ -97,6 +105,32 @@ public String toString() {
     				'}';
     	}
     
    +	public String buildHashKey() {
    +		return this.hashKey;
    +	}
    +
    +	public static String buildHashKey(String key, String namespace, long timestamp) {
    +		return key + ":" + namespace + ":" + timestamp;
    +	}
    +
    +	public void markDelete(int deleteVersion) {
    +		synchronized (this.deleteVersion) {
    --- End diff --
    
    What is the use of this synchronization on a non-final field that cannot be achieved with a volatile field?


---

[GitHub] flink issue #5908: [FLINK-9182]async checkpoints for timer service

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5908
  
    Maybe let me add some more. First, about introducing a separate new state handle. Our long term plan is actually to integrate timers more closely with the backends, so that we can also have a timer service in RocksDB. In general, I would target for reducing the handles(e.g. timers could then be considered also as managed keyed stated) instead of adding more. Second, I would probably suggest a simpler model for the async snapshots. You dropped the idea of making flat copies, but I wonder if this was premature. I can see that maybe calling `set.toArray(...)` per keygroup could (maybe) turn out a bit slow because it has to potentially iterate and flatten linked entries. However, with async snapshots, we could get rid of the key-group partitioning of sets, and instead do a flat copy of the internal array of the priority queue. This would translate to just a single memcopy call internally, which is very efficient. In the async part, we can still partition the timers by key-group in a 
 similar way as the copy-on-write state table does. This would avoid slowing down the event processing path (in fact improving it be unifying the sets) and also keep the approach very straight forward and less invasive.


---

[GitHub] flink issue #5908: [FLINK-9182]async checkpoints for timer service

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on the issue:

    https://github.com/apache/flink/pull/5908
  
    @makeyang can you also please close this PR?


---

[GitHub] flink pull request #5908: [FLINK-9182]async checkpoints for timer service

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5908#discussion_r185729269
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java ---
    @@ -49,13 +50,13 @@
     	/**
     	 * Processing time timers that are currently in-flight.
     	 */
    -	private final Set<InternalTimer<K, N>>[] processingTimeTimersByKeyGroup;
    +	private final Map<String, InternalTimer<K, N>>[] processingTimeTimersByKeyGroup;
    --- End diff --
    
    It seems that you switch from set to map because you want to use `ConcurrentHashMap`, but you can simply have a set that is backed by `ConcurrentHashMap` either from `Collections.newSetFromMap(...)` or `ConcurrentHashMap.newKeySet()`


---

[GitHub] flink issue #5908: [FLINK-9182]async checkpoints for timer service

Posted by makeyang <gi...@git.apache.org>.
Github user makeyang commented on the issue:

    https://github.com/apache/flink/pull/5908
  
    @StefanRRichter I definitely like u ideas about flat copy priority queue in sync and then handler key-group thing in async. so I'll get rid of my solution and try to implement u solution and then give it a new pr


---

[GitHub] flink pull request #5908: [FLINK-9182]async checkpoints for timer service

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5908#discussion_r185734334
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---
    @@ -395,36 +402,102 @@ public final OperatorSnapshotFutures snapshotState(long checkpointId, long times
     	 *
     	 * @param context context that provides information and means required for taking a snapshot
     	 */
    -	public void snapshotState(StateSnapshotContext context) throws Exception {
    +	public void snapshotState(StateSnapshotContext context, OperatorSnapshotFutures snapshotInProgress) throws Exception {
     		if (getKeyedStateBackend() != null) {
     			KeyedStateCheckpointOutputStream out;
    -
    +			OperatorStateCheckpointOutputStream metaOut;
     			try {
     				out = context.getRawKeyedOperatorStateOutput();
     			} catch (Exception exception) {
     				throw new Exception("Could not open raw keyed operator state stream for " +
     					getOperatorName() + '.', exception);
     			}
    -
     			try {
    -				KeyGroupsList allKeyGroups = out.getKeyGroupList();
    -				for (int keyGroupIdx : allKeyGroups) {
    -					out.startNewKeyGroup(keyGroupIdx);
    -
    -					timeServiceManager.snapshotStateForKeyGroup(
    -						new DataOutputViewStreamWrapper(out), keyGroupIdx);
    -				}
    +				metaOut = context.getRawKeyedOperatorStateMetaOutput();
     			} catch (Exception exception) {
    -				throw new Exception("Could not write timer service of " + getOperatorName() +
    -					" to checkpoint state stream.", exception);
    -			} finally {
    -				try {
    -					out.close();
    -				} catch (Exception closeException) {
    -					LOG.warn("Could not close raw keyed operator state stream for {}. This " +
    -						"might have prevented deleting some state data.", getOperatorName(), closeException);
    -				}
    +				throw new Exception("Could not open raw operator state stream for " +
    +					getOperatorName() + '.', exception);
     			}
    +			final Tuple4<Integer, Map<String, HeapInternalTimerService>, Integer, TreeSet<Integer>> ret = timeServiceManager.startOneSnapshotState();
    +			final int currentSnapshotVersion = ret.f0;
    +			final Map<String, HeapInternalTimerService> timerServices = ret.f1;
    +			final Integer stateTableVersion = ret.f2;
    +			final TreeSet<Integer> snapshotVersions = ret.f3;
    +			LOG.info("snapshotVersions after calling startOneSnapshotState:" + snapshotVersions.toString());
    +			Callable<Boolean> snapshotTimerCallable = new Callable() {
    +				@Override
    +				public Boolean call() {
    +					try {
    +						KeyGroupsList allKeyGroups = out.getKeyGroupList();
    +						metaOut.startNewPartition();
    +						DataOutputViewStreamWrapper metaWrapper = new DataOutputViewStreamWrapper(metaOut);
    +						metaWrapper.writeInt(stateTableVersion);
    +						if (snapshotVersions.size() > 0) {
    +							metaWrapper.writeInt(snapshotVersions.size());
    +							for (Integer i : snapshotVersions) {
    +								metaWrapper.writeInt(i);
    +							}
    +						}
    +						else {
    +							metaWrapper.writeInt(0);
    +						}
    +						int keyGroupCount = allKeyGroups.getNumberOfKeyGroups();
    +						metaWrapper.writeInt(keyGroupCount);
    +						for (int keyGroupIdx : allKeyGroups) {
    +							out.startNewKeyGroup(keyGroupIdx);
    +							metaWrapper.writeInt(keyGroupIdx);
    +							InternalTimerServiceSerializationProxy serializationProxy =
    +								new InternalTimerServiceSerializationProxy(timerServices, keyGroupIdx,
    +									currentSnapshotVersion, timeServiceManager, metaWrapper);
    +
    +							serializationProxy.write(new DataOutputViewStreamWrapper(out));
    +
    +						}
    +						LOG.info("return Tuple4 and snapshotVersions:" + snapshotVersions.toString());
    +						return true;
    +					} catch (Exception exception) {
    +						LOG.error("Could not write timer service of " + getOperatorName() +
    +							" to checkpoint state stream.", exception);
    +						return false;
    +					} finally {
    +						timeServiceManager.stopOneSnapshotState(currentSnapshotVersion);
    +						StateSnapshotContextSynchronousImpl snapshotContext = (StateSnapshotContextSynchronousImpl) context;
    +						try {
    +							snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
    +						} catch (IOException e) {
    +							LOG.warn("setKeyedStateRawFuture in callable excpetion", e);
    +							return false;
    --- End diff --
    
    Putting returns inside a finally block here is a bad idea because this can swallow and hide `Throwables`.


---

[GitHub] flink pull request #5908: [FLINK-9182]async checkpoints for timer service

Posted by makeyang <gi...@git.apache.org>.
Github user makeyang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5908#discussion_r187297365
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---
    @@ -395,36 +402,102 @@ public final OperatorSnapshotFutures snapshotState(long checkpointId, long times
     	 *
     	 * @param context context that provides information and means required for taking a snapshot
     	 */
    -	public void snapshotState(StateSnapshotContext context) throws Exception {
    +	public void snapshotState(StateSnapshotContext context, OperatorSnapshotFutures snapshotInProgress) throws Exception {
     		if (getKeyedStateBackend() != null) {
     			KeyedStateCheckpointOutputStream out;
    -
    +			OperatorStateCheckpointOutputStream metaOut;
     			try {
     				out = context.getRawKeyedOperatorStateOutput();
     			} catch (Exception exception) {
     				throw new Exception("Could not open raw keyed operator state stream for " +
     					getOperatorName() + '.', exception);
     			}
    -
     			try {
    -				KeyGroupsList allKeyGroups = out.getKeyGroupList();
    -				for (int keyGroupIdx : allKeyGroups) {
    -					out.startNewKeyGroup(keyGroupIdx);
    -
    -					timeServiceManager.snapshotStateForKeyGroup(
    -						new DataOutputViewStreamWrapper(out), keyGroupIdx);
    -				}
    +				metaOut = context.getRawKeyedOperatorStateMetaOutput();
     			} catch (Exception exception) {
    -				throw new Exception("Could not write timer service of " + getOperatorName() +
    -					" to checkpoint state stream.", exception);
    -			} finally {
    -				try {
    -					out.close();
    -				} catch (Exception closeException) {
    -					LOG.warn("Could not close raw keyed operator state stream for {}. This " +
    -						"might have prevented deleting some state data.", getOperatorName(), closeException);
    -				}
    +				throw new Exception("Could not open raw operator state stream for " +
    +					getOperatorName() + '.', exception);
     			}
    +			final Tuple4<Integer, Map<String, HeapInternalTimerService>, Integer, TreeSet<Integer>> ret = timeServiceManager.startOneSnapshotState();
    +			final int currentSnapshotVersion = ret.f0;
    +			final Map<String, HeapInternalTimerService> timerServices = ret.f1;
    +			final Integer stateTableVersion = ret.f2;
    +			final TreeSet<Integer> snapshotVersions = ret.f3;
    +			LOG.info("snapshotVersions after calling startOneSnapshotState:" + snapshotVersions.toString());
    +			Callable<Boolean> snapshotTimerCallable = new Callable() {
    +				@Override
    +				public Boolean call() {
    +					try {
    +						KeyGroupsList allKeyGroups = out.getKeyGroupList();
    +						metaOut.startNewPartition();
    +						DataOutputViewStreamWrapper metaWrapper = new DataOutputViewStreamWrapper(metaOut);
    +						metaWrapper.writeInt(stateTableVersion);
    +						if (snapshotVersions.size() > 0) {
    +							metaWrapper.writeInt(snapshotVersions.size());
    +							for (Integer i : snapshotVersions) {
    +								metaWrapper.writeInt(i);
    +							}
    +						}
    +						else {
    +							metaWrapper.writeInt(0);
    +						}
    +						int keyGroupCount = allKeyGroups.getNumberOfKeyGroups();
    +						metaWrapper.writeInt(keyGroupCount);
    +						for (int keyGroupIdx : allKeyGroups) {
    +							out.startNewKeyGroup(keyGroupIdx);
    +							metaWrapper.writeInt(keyGroupIdx);
    +							InternalTimerServiceSerializationProxy serializationProxy =
    +								new InternalTimerServiceSerializationProxy(timerServices, keyGroupIdx,
    +									currentSnapshotVersion, timeServiceManager, metaWrapper);
    +
    +							serializationProxy.write(new DataOutputViewStreamWrapper(out));
    +
    +						}
    +						LOG.info("return Tuple4 and snapshotVersions:" + snapshotVersions.toString());
    +						return true;
    +					} catch (Exception exception) {
    +						LOG.error("Could not write timer service of " + getOperatorName() +
    +							" to checkpoint state stream.", exception);
    +						return false;
    +					} finally {
    +						timeServiceManager.stopOneSnapshotState(currentSnapshotVersion);
    +						StateSnapshotContextSynchronousImpl snapshotContext = (StateSnapshotContextSynchronousImpl) context;
    +						try {
    +							snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
    +						} catch (IOException e) {
    +							LOG.warn("setKeyedStateRawFuture in callable excpetion", e);
    +							return false;
    --- End diff --
    
    how about change return type from Boolean to Tuple which contains throwable when exception happens?


---

[GitHub] flink pull request #5908: [FLINK-9182]async checkpoints for timer service

Posted by makeyang <gi...@git.apache.org>.
Github user makeyang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5908#discussion_r187296892
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java ---
    @@ -97,6 +105,32 @@ public String toString() {
     				'}';
     	}
     
    +	public String buildHashKey() {
    +		return this.hashKey;
    +	}
    +
    +	public static String buildHashKey(String key, String namespace, long timestamp) {
    +		return key + ":" + namespace + ":" + timestamp;
    +	}
    +
    +	public void markDelete(int deleteVersion) {
    +		synchronized (this.deleteVersion) {
    --- End diff --
    
    I agree with u that this synchronization can be replaced by volatile


---

[GitHub] flink issue #5908: [FLINK-9182]async checkpoints for timer service

Posted by makeyang <gi...@git.apache.org>.
Github user makeyang commented on the issue:

    https://github.com/apache/flink/pull/5908
  
    @StefanRRichter  should jira be close too?


---

[GitHub] flink issue #5908: [FLINK-9182]async checkpoints for timer service

Posted by makeyang <gi...@git.apache.org>.
Github user makeyang commented on the issue:

    https://github.com/apache/flink/pull/5908
  
    @StefanRRichter sorry for the late answer. just take a surgery few days ago and come back now.
    I'll take close look at you comments and then answer back.


---

[GitHub] flink pull request #5908: [FLINK-9182]async checkpoints for timer service

Posted by makeyang <gi...@git.apache.org>.
Github user makeyang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5908#discussion_r187294295
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java ---
    @@ -39,11 +40,18 @@
     	private final long timestamp;
     	private final K key;
     	private final N namespace;
    +	private final String hashKey;
    --- End diff --
    
    same as above


---

[GitHub] flink pull request #5908: [FLINK-9182]async checkpoints for timer service

Posted by makeyang <gi...@git.apache.org>.
Github user makeyang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5908#discussion_r187294255
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java ---
    @@ -49,13 +50,13 @@
     	/**
     	 * Processing time timers that are currently in-flight.
     	 */
    -	private final Set<InternalTimer<K, N>>[] processingTimeTimersByKeyGroup;
    +	private final Map<String, InternalTimer<K, N>>[] processingTimeTimersByKeyGroup;
    --- End diff --
    
    @StefanRRichter 
    I switch from set to map because in deleteProcessingTimeTimer/deleteEventTimeTimer method, I have to check if contais Timer with currentKey, namespace and time while I don't have version info.
    If I use Set as interface, then I have to iterator Set to fetch exactly timer object, which is not convenient as Map


---

[GitHub] flink pull request #5908: [FLINK-9182]async checkpoints for timer service

Posted by StefanRRichter <gi...@git.apache.org>.
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5908#discussion_r185749320
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java ---
    @@ -222,29 +228,53 @@ public void registerProcessingTimeTimer(N namespace, long time) {
     
     	@Override
     	public void registerEventTimeTimer(N namespace, long time) {
    -		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    -		Set<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer);
    -		if (timerSet.add(timer)) {
    +		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace,
    +			this.knInternalTimeServiceManager.getStateTableVersion().intValue(), -1);
    +		Map<String, InternalTimer<K, N>> timerMap = getEventTimeTimerSetForTimer((K) keyContext.getCurrentKey());
    +		InternalTimer<K, N> prev = timerMap.put(timer.buildHashKey(), timer);
    +		if (prev == null) {
     			eventTimeTimersQueue.add(timer);
     		}
     	}
     
     	@Override
     	public void deleteProcessingTimeTimer(N namespace, long time) {
    -		InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace);
    -		Set<InternalTimer<K, N>> timerSet = getProcessingTimeTimerSetForTimer(timer);
    -		if (timerSet.remove(timer)) {
    +		Map<String, InternalTimer<K, N>> timerMap = getProcessingTimeTimerSetForTimer((K) keyContext.getCurrentKey());
    +		String key = InternalTimer.buildHashKey(keyContext.getCurrentKey().toString(), namespace.toString(), time);
    +		InternalTimer<K, N> timer = timerMap.get(key);
    +		if (timer != null) {
    +			timer.markDelete(this.knInternalTimeServiceManager.getStateTableVersion().intValue());
     			processingTimeTimersQueue.remove(timer);
     		}
    +		this.knInternalTimeServiceManager.getReadLock().lock();
    +		try {
    +			if (this.knInternalTimeServiceManager.getSnapshotVersions().size() == 0) {
    +				timerMap.remove(key);
    --- End diff --
    
    This looks like it could take a very long time (until the timer triggers) until a timer is truly removed when the remove happened while there was a snapshot ongoing? This could potentially accumulate a lot of deleted timers.


---