You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2019/01/20 00:45:03 UTC
[beam] branch master updated: [BEAM-6418] Lower memory consumption
of Flink integration tests (#7512)
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6006038 [BEAM-6418] Lower memory consumption of Flink integration tests (#7512)
6006038 is described below
commit 6006038e5fa537260d44381b4cd11cc7e6310856
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Sat Jan 19 19:44:53 2019 -0500
[BEAM-6418] Lower memory consumption of Flink integration tests (#7512)
---
.../apache/beam/runners/flink/TestFlinkRunner.java | 4 ++++
.../beam/runners/flink/PortableExecutionTest.java | 28 +++++++++++++++-------
.../runners/flink/PortableStateExecutionTest.java | 28 +++++++++++++++-------
.../runners/flink/PortableTimersExecutionTest.java | 28 +++++++++++++++-------
.../flink/streaming/GroupByWithNullValuesTest.java | 4 ++--
5 files changed, 66 insertions(+), 26 deletions(-)
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
index cefb161..2344f97 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
@@ -33,6 +33,10 @@ public class TestFlinkRunner extends PipelineRunner<PipelineResult> {
private TestFlinkRunner(FlinkPipelineOptions options) {
options.setShutdownSourcesOnFinalWatermark(true);
+ if (options.getParallelism() == -1) {
+ // Limit parallelism to avoid too much memory consumption during local execution
+ options.setParallelism(1);
+ }
this.delegate = FlinkRunner.fromOptions(options);
}
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
index 367fee1..aa3a29c 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.flink;
import java.io.Serializable;
import java.util.Collections;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.Environments;
@@ -44,13 +45,15 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Tests the execution of a pipeline from specification to execution on the portable Flink runner.
@@ -60,6 +63,8 @@ import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class PortableExecutionTest implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(PortableExecutionTest.class);
+
@Parameters(name = "streaming: {0}")
public static Object[] data() {
return new Object[] {true, false};
@@ -67,16 +72,23 @@ public class PortableExecutionTest implements Serializable {
@Parameter public boolean isStreaming;
- private transient ListeningExecutorService flinkJobExecutor;
+ private static ListeningExecutorService flinkJobExecutor;
- @Before
- public void setup() {
- flinkJobExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+ @BeforeClass
+ public static void setup() {
+ // Restrict this to only one thread to avoid multiple Flink clusters up at the same time
+ // which is not suitable for memory-constraint environments, i.e. Jenkins.
+ flinkJobExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
}
- @After
- public void tearDown() {
+ @AfterClass
+ public static void tearDown() throws InterruptedException {
flinkJobExecutor.shutdown();
+ flinkJobExecutor.awaitTermination(10, TimeUnit.SECONDS);
+ if (!flinkJobExecutor.isShutdown()) {
+ LOG.warn("Could not shutdown Flink job executor");
+ }
+ flinkJobExecutor = null;
}
@Test(timeout = 120_000)
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
index 036bd93..afd0abe 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.flink;
import java.io.Serializable;
import java.util.Collections;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.Environments;
@@ -42,13 +43,15 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Tests the State server integration of {@link
@@ -57,6 +60,8 @@ import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class PortableStateExecutionTest implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(PortableStateExecutionTest.class);
+
@Parameters(name = "streaming: {0}")
public static Object[] data() {
return new Object[] {true, false};
@@ -64,16 +69,23 @@ public class PortableStateExecutionTest implements Serializable {
@Parameter public boolean isStreaming;
- private transient ListeningExecutorService flinkJobExecutor;
+ private static ListeningExecutorService flinkJobExecutor;
- @Before
- public void setup() {
- flinkJobExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+ @BeforeClass
+ public static void setup() {
+ // Restrict this to only one thread to avoid multiple Flink clusters up at the same time
+ // which is not suitable for memory-constraint environments, i.e. Jenkins.
+ flinkJobExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
}
- @After
- public void tearDown() {
+ @AfterClass
+ public static void tearDown() throws InterruptedException {
flinkJobExecutor.shutdown();
+ flinkJobExecutor.awaitTermination(10, TimeUnit.SECONDS);
+ if (!flinkJobExecutor.isShutdown()) {
+ LOG.warn("Could not shutdown Flink job executor");
+ }
+ flinkJobExecutor = null;
}
// Special values which clear / write out state
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
index 10b9fb5..3133c73 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.Environments;
@@ -52,13 +53,15 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors;
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Tests the state and timer integration of {@link
@@ -67,6 +70,8 @@ import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class PortableTimersExecutionTest implements Serializable {
+ private static final Logger LOG = LoggerFactory.getLogger(PortableTimersExecutionTest.class);
+
@Parameters(name = "streaming: {0}")
public static Object[] testModes() {
return new Object[] {true, false};
@@ -74,16 +79,23 @@ public class PortableTimersExecutionTest implements Serializable {
@Parameter public boolean isStreaming;
- private transient ListeningExecutorService flinkJobExecutor;
+ private static ListeningExecutorService flinkJobExecutor;
- @Before
- public void setup() {
- flinkJobExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+ @BeforeClass
+ public static void setup() {
+ // Restrict this to only one thread to avoid multiple Flink clusters up at the same time
+ // which is not suitable for memory-constraint environments, i.e. Jenkins.
+ flinkJobExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
}
- @After
- public void tearDown() {
+ @AfterClass
+ public static void tearDown() throws InterruptedException {
flinkJobExecutor.shutdown();
+ flinkJobExecutor.awaitTermination(10, TimeUnit.SECONDS);
+ if (!flinkJobExecutor.isShutdown()) {
+ LOG.warn("Could not shutdown Flink job executor");
+ }
+ flinkJobExecutor = null;
}
@Test(timeout = 120_000)
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByWithNullValuesTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByWithNullValuesTest.java
index 7411c94..19e544e 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByWithNullValuesTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByWithNullValuesTest.java
@@ -21,7 +21,7 @@ import static org.junit.Assert.assertNull;
import java.io.Serializable;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
-import org.apache.beam.runners.flink.FlinkRunner;
+import org.apache.beam.runners.flink.TestFlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -45,7 +45,7 @@ public class GroupByWithNullValuesTest implements Serializable {
public void testGroupByWithNullValues() {
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
- options.setRunner(FlinkRunner.class);
+ options.setRunner(TestFlinkRunner.class);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);