You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2019/03/20 20:57:31 UTC
[beam] branch master updated: [flink] debug logging for state access
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a01f1a1 [flink] debug logging for state access
a01f1a1 is described below
commit a01f1a15e00e042137409222580147f5b9218d75
Author: Thomas Weise <th...@apache.org>
AuthorDate: Wed Mar 13 21:41:07 2019 -0700
[flink] debug logging for state access
---
.../streaming/ExecutableStageDoFnOperator.java | 24 ++++++++++++++++++++++
1 file changed, 24 insertions(+)
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 8b0685b..210da7d 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -276,6 +276,14 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
stateBackendLock.lock();
prepareStateBackend(key, keyCoder);
StateNamespace namespace = StateNamespaces.window(windowCoder, window);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "State get for {} {} {} {}",
+ pTransformId,
+ userStateId,
+ Arrays.toString(keyedStateBackend.getCurrentKey().array()),
+ window);
+ }
BagState<V> bagState =
stateInternals.state(namespace, StateTags.bag(userStateId, valueCoder));
return bagState.read();
@@ -290,6 +298,14 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
stateBackendLock.lock();
prepareStateBackend(key, keyCoder);
StateNamespace namespace = StateNamespaces.window(windowCoder, window);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "State append for {} {} {} {}",
+ pTransformId,
+ userStateId,
+ Arrays.toString(keyedStateBackend.getCurrentKey().array()),
+ window);
+ }
BagState<V> bagState =
stateInternals.state(namespace, StateTags.bag(userStateId, valueCoder));
while (values.hasNext()) {
@@ -306,6 +322,14 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
stateBackendLock.lock();
prepareStateBackend(key, keyCoder);
StateNamespace namespace = StateNamespaces.window(windowCoder, window);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "State clear for {} {} {} {}",
+ pTransformId,
+ userStateId,
+ Arrays.toString(keyedStateBackend.getCurrentKey().array()),
+ window);
+ }
BagState<V> bagState =
stateInternals.state(namespace, StateTags.bag(userStateId, valueCoder));
bagState.clear();