You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2019/01/16 15:58:11 UTC

[beam] branch release-2.10.0 updated: [BEAM-6440] Fix leakage of timer de-duplication map

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch release-2.10.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.10.0 by this push:
     new 8ab9b55  [BEAM-6440] Fix leakage of timer de-duplication map
     new 7807606  Merge pull request #7530: [BEAM-6440] Fix leakage of timer de-duplication map
8ab9b55 is described below

commit 8ab9b557f15030ff905986036a2364f5dba5c8a2
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Tue Jan 15 12:56:24 2019 -0500

    [BEAM-6440] Fix leakage of timer de-duplication map
    
    The FlinkStateInternals use a keyed map of pending timers to make up for Flink's
    limitation to only be able to delete timers with their original timestamp, not
    via timer id.
    
    The Map leaked memory because subclasses of DoFnOperator overwrote `fireTimer`
    which was responsible for performing cleanup in the map upon firing a timer.
---
 .../wrappers/streaming/DoFnOperator.java           |  5 ++-
 .../wrappers}/streaming/DedupingOperatorTest.java  |  4 +-
 .../wrappers}/streaming/DoFnOperatorTest.java      |  5 +--
 .../wrappers}/streaming/StreamRecordStripper.java  |  2 +-
 .../streaming/WindowDoFnOperatorTest.java          | 48 +++++++++++++++++++---
 5 files changed, 50 insertions(+), 14 deletions(-)

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index dcd0f0b..647d6cc 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -698,12 +698,14 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window
   public void onEventTime(InternalTimer<Object, TimerData> timer) throws Exception {
     // We don't have to cal checkInvokeStartBundle() because it's already called in
     // processWatermark*().
+    timerInternals.cleanupPendingTimer(timer.getNamespace());
     fireTimer(timer);
   }
 
   @Override
   public void onProcessingTime(InternalTimer<Object, TimerData> timer) throws Exception {
     checkInvokeStartBundle();
+    timerInternals.cleanupPendingTimer(timer.getNamespace());
     fireTimer(timer);
   }
 
@@ -714,7 +716,6 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window
     // This is a user timer, so namespace must be WindowNamespace
     checkArgument(namespace instanceof WindowNamespace);
     BoundedWindow window = ((WindowNamespace) namespace).getWindow();
-    timerInternals.cleanupPendingTimer(timerData);
     pushbackDoFnRunner.onTimer(
         timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
   }
@@ -927,7 +928,7 @@ public class DoFnOperator<InputT, OutputT> extends AbstractStreamOperator<Window
      * namespace of the timer and the timer's id. Necessary for supporting removal of existing
      * timers. In Flink removal of timers can only be done by providing id and time of the timer.
      */
-    private final MapState<String, TimerData> pendingTimersById;
+    final MapState<String, TimerData> pendingTimersById;
 
     private FlinkTimerInternals() {
       MapStateDescriptor<String, TimerData> pendingTimersByIdStateDescriptor =
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DedupingOperatorTest.java
similarity index 95%
rename from runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
rename to runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DedupingOperatorTest.java
index a6fa3db..3a2c4a3 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DedupingOperatorTest.java
@@ -15,9 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.flink.streaming;
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
-import static org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
+import static org.apache.beam.runners.flink.translation.wrappers.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertThat;
 
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
similarity index 99%
rename from runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
rename to runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
index 416595c..ed1630c 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
@@ -15,9 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.flink.streaming;
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
-import static org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
+import static org.apache.beam.runners.flink.translation.wrappers.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertEquals;
@@ -31,7 +31,6 @@ import org.apache.beam.runners.core.StatefulDoFnRunner;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamRecordStripper.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/StreamRecordStripper.java
similarity index 96%
rename from runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamRecordStripper.java
rename to runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/StreamRecordStripper.java
index 26a86a4..c8c7b24 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamRecordStripper.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/StreamRecordStripper.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.flink.streaming;
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.util.WindowedValue;
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
similarity index 83%
rename from runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java
rename to runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
index 56a056f..91114cc 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
@@ -15,17 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.flink.streaming;
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
-import static org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
+import static org.apache.beam.runners.flink.translation.wrappers.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
 import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING;
 import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.ON_TIME;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
 import static org.joda.time.Duration.standardMinutes;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
 
 import java.io.ByteArrayOutputStream;
 import java.nio.ByteBuffer;
@@ -33,9 +34,6 @@ import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.MultiOutputOutputManagerFactory;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -119,6 +117,44 @@ public class WindowDoFnOperatorTest {
     testHarness.close();
   }
 
+  @Test
+  public void testTimerCleanupOfPendingTimerList() throws Exception {
+    // test harness
+    WindowDoFnOperator<Long, Long, Long> windowDoFnOperator = getWindowDoFnOperator();
+    KeyedOneInputStreamOperatorTestHarness<
+            ByteBuffer, WindowedValue<KeyedWorkItem<Long, Long>>, WindowedValue<KV<Long, Long>>>
+        testHarness = createTestHarness(windowDoFnOperator);
+    testHarness.open();
+
+    DoFnOperator<KeyedWorkItem<Long, Long>, KV<Long, Long>>.FlinkTimerInternals timerInternals =
+        windowDoFnOperator.timerInternals;
+
+    // process elements
+    IntervalWindow window = new IntervalWindow(new Instant(0), Duration.millis(100));
+    IntervalWindow window2 = new IntervalWindow(new Instant(100), Duration.millis(100));
+    testHarness.processWatermark(0L);
+    testHarness.processElement(
+        Item.builder().key(1L).timestamp(1L).value(100L).window(window).build().toStreamRecord());
+    testHarness.processElement(
+        Item.builder()
+            .key(1L)
+            .timestamp(150L)
+            .value(150L)
+            .window(window2)
+            .build()
+            .toStreamRecord());
+
+    assertThat(Iterables.size(timerInternals.pendingTimersById.keys()), is(2));
+
+    // close window
+    testHarness.processWatermark(200L);
+
+    assertThat(Iterables.size(timerInternals.pendingTimersById.keys()), is(0));
+
+    // cleanup
+    testHarness.close();
+  }
+
   private WindowDoFnOperator<Long, Long, Long> getWindowDoFnOperator() {
     WindowingStrategy<Object, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(FixedWindows.of(standardMinutes(1)));