You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by GitBox <gi...@apache.org> on 2019/01/16 02:20:59 UTC

[beam] Diff for: [GitHub] tweise merged pull request #7517: [BEAM-6440] Fix leakage of timer de-duplication map

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index dcd0f0bd7dfe..647d6ccacb0a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -698,12 +698,14 @@ public void snapshotState(StateSnapshotContext context) throws Exception {
   public void onEventTime(InternalTimer<Object, TimerData> timer) throws Exception {
     // We don't have to cal checkInvokeStartBundle() because it's already called in
     // processWatermark*().
+    timerInternals.cleanupPendingTimer(timer.getNamespace());
     fireTimer(timer);
   }
 
   @Override
   public void onProcessingTime(InternalTimer<Object, TimerData> timer) throws Exception {
     checkInvokeStartBundle();
+    timerInternals.cleanupPendingTimer(timer.getNamespace());
     fireTimer(timer);
   }
 
@@ -714,7 +716,6 @@ public void fireTimer(InternalTimer<?, TimerData> timer) {
     // This is a user timer, so namespace must be WindowNamespace
     checkArgument(namespace instanceof WindowNamespace);
     BoundedWindow window = ((WindowNamespace) namespace).getWindow();
-    timerInternals.cleanupPendingTimer(timerData);
     pushbackDoFnRunner.onTimer(
         timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
   }
@@ -927,7 +928,7 @@ public TimerInternals timerInternals() {
      * namespace of the timer and the timer's id. Necessary for supporting removal of existing
      * timers. In Flink removal of timers can only be done by providing id and time of the timer.
      */
-    private final MapState<String, TimerData> pendingTimersById;
+    final MapState<String, TimerData> pendingTimersById;
 
     private FlinkTimerInternals() {
       MapStateDescriptor<String, TimerData> pendingTimersByIdStateDescriptor =
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DedupingOperatorTest.java
similarity index 95%
rename from runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
rename to runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DedupingOperatorTest.java
index a6fa3dba6667..3a2c4a376530 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DedupingOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DedupingOperatorTest.java
@@ -15,9 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.flink.streaming;
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
-import static org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
+import static org.apache.beam.runners.flink.translation.wrappers.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertThat;
 
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
similarity index 99%
rename from runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
rename to runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
index 416595cb045d..ed1630c0472a 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
@@ -15,9 +15,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.flink.streaming;
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
-import static org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
+import static org.apache.beam.runners.flink.translation.wrappers.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
 import static org.junit.Assert.assertEquals;
@@ -31,7 +31,6 @@
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamRecordStripper.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/StreamRecordStripper.java
similarity index 96%
rename from runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamRecordStripper.java
rename to runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/StreamRecordStripper.java
index 26a86a4b9f24..c8c7b24924b8 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StreamRecordStripper.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/StreamRecordStripper.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.flink.streaming;
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.util.WindowedValue;
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
similarity index 83%
rename from runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java
rename to runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
index 56a056f0498c..91114cc90a1a 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/WindowDoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperatorTest.java
@@ -15,17 +15,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.flink.streaming;
+package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
-import static org.apache.beam.runners.flink.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
+import static org.apache.beam.runners.flink.translation.wrappers.streaming.StreamRecordStripper.stripStreamRecordFromWindowedValue;
 import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING;
 import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.ON_TIME;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.core.Is.is;
 import static org.joda.time.Duration.standardMinutes;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
 
 import java.io.ByteArrayOutputStream;
 import java.nio.ByteBuffer;
@@ -33,9 +34,6 @@
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.MultiOutputOutputManagerFactory;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItem;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKeyedWorkItemCoder;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -119,6 +117,44 @@ public void testRestore() throws Exception {
     testHarness.close();
   }
 
+  @Test
+  public void testTimerCleanupOfPendingTimerList() throws Exception {
+    // test harness
+    WindowDoFnOperator<Long, Long, Long> windowDoFnOperator = getWindowDoFnOperator();
+    KeyedOneInputStreamOperatorTestHarness<
+            ByteBuffer, WindowedValue<KeyedWorkItem<Long, Long>>, WindowedValue<KV<Long, Long>>>
+        testHarness = createTestHarness(windowDoFnOperator);
+    testHarness.open();
+
+    DoFnOperator<KeyedWorkItem<Long, Long>, KV<Long, Long>>.FlinkTimerInternals timerInternals =
+        windowDoFnOperator.timerInternals;
+
+    // process elements
+    IntervalWindow window = new IntervalWindow(new Instant(0), Duration.millis(100));
+    IntervalWindow window2 = new IntervalWindow(new Instant(100), Duration.millis(100));
+    testHarness.processWatermark(0L);
+    testHarness.processElement(
+        Item.builder().key(1L).timestamp(1L).value(100L).window(window).build().toStreamRecord());
+    testHarness.processElement(
+        Item.builder()
+            .key(1L)
+            .timestamp(150L)
+            .value(150L)
+            .window(window2)
+            .build()
+            .toStreamRecord());
+
+    assertThat(Iterables.size(timerInternals.pendingTimersById.keys()), is(2));
+
+    // close window
+    testHarness.processWatermark(200L);
+
+    assertThat(Iterables.size(timerInternals.pendingTimersById.keys()), is(0));
+
+    // cleanup
+    testHarness.close();
+  }
+
   private WindowDoFnOperator<Long, Long, Long> getWindowDoFnOperator() {
     WindowingStrategy<Object, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(FixedWindows.of(standardMinutes(1)));


With regards,
Apache Git Services