You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2019/01/20 00:45:03 UTC

[beam] branch master updated: [BEAM-6418] Lower memory consumption of Flink integration tests (#7512)

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6006038  [BEAM-6418] Lower memory consumption of Flink integration tests (#7512)
6006038 is described below

commit 6006038e5fa537260d44381b4cd11cc7e6310856
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Sat Jan 19 19:44:53 2019 -0500

    [BEAM-6418] Lower memory consumption of Flink integration tests (#7512)
---
 .../apache/beam/runners/flink/TestFlinkRunner.java |  4 ++++
 .../beam/runners/flink/PortableExecutionTest.java  | 28 +++++++++++++++-------
 .../runners/flink/PortableStateExecutionTest.java  | 28 +++++++++++++++-------
 .../runners/flink/PortableTimersExecutionTest.java | 28 +++++++++++++++-------
 .../flink/streaming/GroupByWithNullValuesTest.java |  4 ++--
 5 files changed, 66 insertions(+), 26 deletions(-)

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
index cefb161..2344f97 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
@@ -33,6 +33,10 @@ public class TestFlinkRunner extends PipelineRunner<PipelineResult> {
 
   private TestFlinkRunner(FlinkPipelineOptions options) {
     options.setShutdownSourcesOnFinalWatermark(true);
+    if (options.getParallelism() == -1) {
+      // Limit parallelism to avoid too much memory consumption during local execution
+      options.setParallelism(1);
+    }
     this.delegate = FlinkRunner.fromOptions(options);
   }
 
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
index 367fee1..aa3a29c 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.flink;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.Environments;
@@ -44,13 +45,15 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Tests the execution of a pipeline from specification to execution on the portable Flink runner.
@@ -60,6 +63,8 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class PortableExecutionTest implements Serializable {
 
+  private static final Logger LOG = LoggerFactory.getLogger(PortableExecutionTest.class);
+
   @Parameters(name = "streaming: {0}")
   public static Object[] data() {
     return new Object[] {true, false};
@@ -67,16 +72,23 @@ public class PortableExecutionTest implements Serializable {
 
   @Parameter public boolean isStreaming;
 
-  private transient ListeningExecutorService flinkJobExecutor;
+  private static ListeningExecutorService flinkJobExecutor;
 
-  @Before
-  public void setup() {
-    flinkJobExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+  @BeforeClass
+  public static void setup() {
+    // Restrict this to only one thread to avoid multiple Flink clusters up at the same time
+    // which is not suitable for memory-constraint environments, i.e. Jenkins.
+    flinkJobExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
   }
 
-  @After
-  public void tearDown() {
+  @AfterClass
+  public static void tearDown() throws InterruptedException {
     flinkJobExecutor.shutdown();
+    flinkJobExecutor.awaitTermination(10, TimeUnit.SECONDS);
+    if (!flinkJobExecutor.isShutdown()) {
+      LOG.warn("Could not shutdown Flink job executor");
+    }
+    flinkJobExecutor = null;
   }
 
   @Test(timeout = 120_000)
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
index 036bd93..afd0abe 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.flink;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.Environments;
@@ -42,13 +43,15 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Tests the State server integration of {@link
@@ -57,6 +60,8 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class PortableStateExecutionTest implements Serializable {
 
+  private static final Logger LOG = LoggerFactory.getLogger(PortableStateExecutionTest.class);
+
   @Parameters(name = "streaming: {0}")
   public static Object[] data() {
     return new Object[] {true, false};
@@ -64,16 +69,23 @@ public class PortableStateExecutionTest implements Serializable {
 
   @Parameter public boolean isStreaming;
 
-  private transient ListeningExecutorService flinkJobExecutor;
+  private static ListeningExecutorService flinkJobExecutor;
 
-  @Before
-  public void setup() {
-    flinkJobExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+  @BeforeClass
+  public static void setup() {
+    // Restrict this to only one thread to avoid multiple Flink clusters up at the same time
+    // which is not suitable for memory-constraint environments, i.e. Jenkins.
+    flinkJobExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
   }
 
-  @After
-  public void tearDown() {
+  @AfterClass
+  public static void tearDown() throws InterruptedException {
     flinkJobExecutor.shutdown();
+    flinkJobExecutor.awaitTermination(10, TimeUnit.SECONDS);
+    if (!flinkJobExecutor.isShutdown()) {
+      LOG.warn("Could not shutdown Flink job executor");
+    }
+    flinkJobExecutor = null;
   }
 
   // Special values which clear / write out state
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
index 10b9fb5..3133c73 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.Environments;
@@ -52,13 +53,15 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Tests the state and timer integration of {@link
@@ -67,6 +70,8 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class PortableTimersExecutionTest implements Serializable {
 
+  private static final Logger LOG = LoggerFactory.getLogger(PortableTimersExecutionTest.class);
+
   @Parameters(name = "streaming: {0}")
   public static Object[] testModes() {
     return new Object[] {true, false};
@@ -74,16 +79,23 @@ public class PortableTimersExecutionTest implements Serializable {
 
   @Parameter public boolean isStreaming;
 
-  private transient ListeningExecutorService flinkJobExecutor;
+  private static ListeningExecutorService flinkJobExecutor;
 
-  @Before
-  public void setup() {
-    flinkJobExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+  @BeforeClass
+  public static void setup() {
+    // Restrict this to only one thread to avoid multiple Flink clusters up at the same time
+    // which is not suitable for memory-constraint environments, i.e. Jenkins.
+    flinkJobExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
   }
 
-  @After
-  public void tearDown() {
+  @AfterClass
+  public static void tearDown() throws InterruptedException {
     flinkJobExecutor.shutdown();
+    flinkJobExecutor.awaitTermination(10, TimeUnit.SECONDS);
+    if (!flinkJobExecutor.isShutdown()) {
+      LOG.warn("Could not shutdown Flink job executor");
+    }
+    flinkJobExecutor = null;
   }
 
   @Test(timeout = 120_000)
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByWithNullValuesTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByWithNullValuesTest.java
index 7411c94..19e544e 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByWithNullValuesTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByWithNullValuesTest.java
@@ -21,7 +21,7 @@ import static org.junit.Assert.assertNull;
 
 import java.io.Serializable;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
-import org.apache.beam.runners.flink.FlinkRunner;
+import org.apache.beam.runners.flink.TestFlinkRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -45,7 +45,7 @@ public class GroupByWithNullValuesTest implements Serializable {
   public void testGroupByWithNullValues() {
     FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
 
-    options.setRunner(FlinkRunner.class);
+    options.setRunner(TestFlinkRunner.class);
     options.setStreaming(true);
 
     Pipeline pipeline = Pipeline.create(options);