You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2018/12/21 19:03:11 UTC
[beam] branch master updated: [BEAM-6283] Convert
PortableStateExecutionTest and PortableExecutionTest to using PAssert
This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 845de99 [BEAM-6283] Convert PortableStateExecutionTest and PortableExecutionTest to using PAssert
845de99 is described below
commit 845de99bb3db353fc219333f736bd02bae65a5ad
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Fri Dec 21 20:03:03 2018 +0100
[BEAM-6283] Convert PortableStateExecutionTest and PortableExecutionTest to using PAssert
This fixes a race condition in the tests when writing results to a static map.
---
.../beam/runners/flink/PortableExecutionTest.java | 94 +++++-----
.../runners/flink/PortableStateExecutionTest.java | 193 ++++++++++-----------
.../runners/flink/PortableTimersExecutionTest.java | 2 +-
3 files changed, 136 insertions(+), 153 deletions(-)
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
index 34985d7..9542bdd 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
@@ -17,19 +17,16 @@
*/
package org.apache.beam.runners.flink;
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
+import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.Serializable;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.Executors;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.Environments;
+import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
@@ -39,12 +36,14 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -61,7 +60,7 @@ import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class PortableExecutionTest implements Serializable {
- @Parameters
+ @Parameters(name = "streaming: {0}")
public static Object[] data() {
return new Object[] {true, false};
}
@@ -80,9 +79,7 @@ public class PortableExecutionTest implements Serializable {
flinkJobExecutor.shutdown();
}
- private static ArrayList<KV<String, Iterable<Long>>> outputValues = new ArrayList<>();
-
- @Test
+ @Test(timeout = 120_000)
public void testExecution() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(CrashingRunner.class);
@@ -92,45 +89,42 @@ public class PortableExecutionTest implements Serializable {
.as(PortablePipelineOptions.class)
.setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
Pipeline p = Pipeline.create(options);
- p.apply("impulse", Impulse.create())
- .apply(
- "create",
- ParDo.of(
- new DoFn<byte[], String>() {
- @ProcessElement
- public void process(ProcessContext ctxt) {
- ctxt.output("zero");
- ctxt.output("one");
- ctxt.output("two");
- }
- }))
- .apply(
- "len",
- ParDo.of(
- new DoFn<String, Long>() {
- @ProcessElement
- public void process(ProcessContext ctxt) {
- ctxt.output((long) ctxt.element().length());
- }
- }))
- .apply("addKeys", WithKeys.of("foo"))
- // Use some unknown coders
- .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianLongCoder.of()))
- // Force the output to be materialized
- .apply("gbk", GroupByKey.create())
- .apply(
- "collect",
- ParDo.of(
- new DoFn<KV<String, Iterable<Long>>, Void>() {
- @ProcessElement
- public void process(ProcessContext ctx) {
- outputValues.add(ctx.element());
- }
- }));
+ PCollection<KV<String, Iterable<Long>>> result =
+ p.apply("impulse", Impulse.create())
+ .apply(
+ "create",
+ ParDo.of(
+ new DoFn<byte[], String>() {
+ @ProcessElement
+ public void process(ProcessContext ctxt) {
+ ctxt.output("zero");
+ ctxt.output("one");
+ ctxt.output("two");
+ }
+ }))
+ .apply(
+ "len",
+ ParDo.of(
+ new DoFn<String, Long>() {
+ @ProcessElement
+ public void process(ProcessContext ctxt) {
+ ctxt.output((long) ctxt.element().length());
+ }
+ }))
+ .apply("addKeys", WithKeys.of("foo"))
+ // Use some unknown coders
+ .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianLongCoder.of()))
+ // Force the output to be materialized
+ .apply("gbk", GroupByKey.create());
+
+ PAssert.that(result).containsInAnyOrder(KV.of("foo", ImmutableList.of(4L, 3L, 3L)));
+
+ // This is line below required to convert the PAssert's read to an impulse, which is expected
+ // by the GreedyPipelineFuser.
+ p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
- outputValues.clear();
// execute the pipeline
FlinkJobInvocation jobInvocation =
FlinkJobInvocation.create(
@@ -140,16 +134,10 @@ public class PortableExecutionTest implements Serializable {
pipelineProto,
options.as(FlinkPipelineOptions.class),
null,
- Collections.EMPTY_LIST);
+ Collections.emptyList());
jobInvocation.start();
- long timeout = System.currentTimeMillis() + 60 * 1000;
- while (jobInvocation.getState() != Enum.DONE && System.currentTimeMillis() < timeout) {
+ while (jobInvocation.getState() != Enum.DONE) {
Thread.sleep(1000);
}
- assertEquals("job state", Enum.DONE, jobInvocation.getState());
-
- assertEquals(1, outputValues.size());
- assertEquals("foo", outputValues.get(0).getKey());
- assertThat(outputValues.get(0).getValue(), containsInAnyOrder(4L, 3L, 3L));
}
}
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
index 05194d1..a658a1c 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
@@ -17,20 +17,15 @@
*/
package org.apache.beam.runners.flink;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.Serializable;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.Executors;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.Environments;
+import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.VarIntCoder;
@@ -41,10 +36,12 @@ import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -60,7 +57,7 @@ import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class PortableStateExecutionTest implements Serializable {
- @Parameters
+ @Parameters(name = "streaming: {0}")
public static Object[] data() {
return new Object[] {true, false};
}
@@ -79,21 +76,11 @@ public class PortableStateExecutionTest implements Serializable {
flinkJobExecutor.shutdown();
}
- // State -> Key -> Value
- private static final Map<String, Map<String, Integer>> stateValuesMap = new HashMap<>();
-
- @Before
- public void before() {
- stateValuesMap.clear();
- stateValuesMap.put("valueState", new HashMap<>());
- stateValuesMap.put("valueState2", new HashMap<>());
- }
-
// Special values which clear / write out state
private static final int CLEAR_STATE = -1;
- private static final int WRITE_STATE_TO_MAP = -2;
+ private static final int WRITE_STATE = -2;
- @Test
+ @Test(timeout = 120_000)
public void testExecution() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(CrashingRunner.class);
@@ -103,74 +90,93 @@ public class PortableStateExecutionTest implements Serializable {
.as(PortablePipelineOptions.class)
.setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
Pipeline p = Pipeline.create(options);
- p.apply(Impulse.create())
- .apply(
- ParDo.of(
- new DoFn<byte[], KV<String, Integer>>() {
- @ProcessElement
- public void process(ProcessContext ctx) {
- // Values == -1 will clear the state
- ctx.output(KV.of("clearedState", 1));
- ctx.output(KV.of("clearedState", CLEAR_STATE));
- // values >= 1 will be added on top of each other
- ctx.output(KV.of("bla1", 42));
- ctx.output(KV.of("bla", 23));
- ctx.output(KV.of("bla2", 64));
- ctx.output(KV.of("bla", 1));
- ctx.output(KV.of("bla", 1));
- // values == -2 will write the state to a map
- ctx.output(KV.of("bla", WRITE_STATE_TO_MAP));
- ctx.output(KV.of("bla1", WRITE_STATE_TO_MAP));
- ctx.output(KV.of("bla2", WRITE_STATE_TO_MAP));
- ctx.output(KV.of("clearedState", -2));
- }
- }))
- .apply(
- "statefulDoFn",
- ParDo.of(
- new DoFn<KV<String, Integer>, String>() {
- @StateId("valueState")
- private final StateSpec<ValueState<Integer>> valueStateSpec =
- StateSpecs.value(VarIntCoder.of());
-
- @StateId("valueState2")
- private final StateSpec<ValueState<Integer>> valueStateSpec2 =
- StateSpecs.value(VarIntCoder.of());
-
- @ProcessElement
- public void process(
- ProcessContext ctx,
- @StateId("valueState") ValueState<Integer> valueState,
- @StateId("valueState2") ValueState<Integer> valueState2) {
- performStateUpdates("valueState", ctx, valueState);
- performStateUpdates("valueState2", ctx, valueState2);
- }
-
- private void performStateUpdates(
- String stateId, ProcessContext ctx, ValueState<Integer> valueState) {
- Map<String, Integer> stateValues = stateValuesMap.get(stateId);
- Integer value = ctx.element().getValue();
- if (value == null) {
- throw new IllegalStateException();
- }
- switch (value) {
- case CLEAR_STATE:
- valueState.clear();
- break;
- case WRITE_STATE_TO_MAP:
- stateValues.put(ctx.element().getKey(), valueState.read());
- break;
- default:
- Integer currentState = valueState.read();
- if (currentState == null) {
- currentState = value;
- } else {
- currentState += value;
+ PCollection<KV<String, String>> output =
+ p.apply(Impulse.create())
+ .apply(
+ ParDo.of(
+ new DoFn<byte[], KV<String, Integer>>() {
+ @ProcessElement
+ public void process(ProcessContext ctx) {
+ // Values == -1 will clear the state
+ ctx.output(KV.of("clearedState", 1));
+ ctx.output(KV.of("clearedState", CLEAR_STATE));
+ // values >= 1 will be added on top of each other
+ ctx.output(KV.of("bla1", 42));
+ ctx.output(KV.of("bla", 23));
+ ctx.output(KV.of("bla2", 64));
+ ctx.output(KV.of("bla", 1));
+ ctx.output(KV.of("bla", 1));
+ // values == -2 will write the current state to the output
+ ctx.output(KV.of("bla", WRITE_STATE));
+ ctx.output(KV.of("bla1", WRITE_STATE));
+ ctx.output(KV.of("bla2", WRITE_STATE));
+ ctx.output(KV.of("clearedState", WRITE_STATE));
+ }
+ }))
+ .apply(
+ "statefulDoFn",
+ ParDo.of(
+ new DoFn<KV<String, Integer>, KV<String, String>>() {
+ @StateId("valueState")
+ private final StateSpec<ValueState<Integer>> valueStateSpec =
+ StateSpecs.value(VarIntCoder.of());
+
+ @StateId("valueState2")
+ private final StateSpec<ValueState<Integer>> valueStateSpec2 =
+ StateSpecs.value(VarIntCoder.of());
+
+ @ProcessElement
+ public void process(
+ ProcessContext ctx,
+ @StateId("valueState") ValueState<Integer> valueState,
+ @StateId("valueState2") ValueState<Integer> valueState2) {
+ performStateUpdates(ctx, valueState);
+ performStateUpdates(ctx, valueState2);
+ }
+
+ private void performStateUpdates(
+ ProcessContext ctx, ValueState<Integer> valueState) {
+ Integer value = ctx.element().getValue();
+ if (value == null) {
+ throw new IllegalStateException();
}
- valueState.write(currentState);
- }
- }
- }));
+ switch (value) {
+ case CLEAR_STATE:
+ valueState.clear();
+ break;
+ case WRITE_STATE:
+ Integer read = valueState.read();
+ ctx.output(
+ KV.of(
+ ctx.element().getKey(),
+ read == null ? "null" : read.toString()));
+ break;
+ default:
+ Integer currentState = valueState.read();
+ if (currentState == null) {
+ currentState = value;
+ } else {
+ currentState += value;
+ }
+ valueState.write(currentState);
+ }
+ }
+ }));
+
+ PAssert.that(output)
+ .containsInAnyOrder(
+ KV.of("bla", "25"),
+ KV.of("bla1", "42"),
+ KV.of("bla2", "64"),
+ KV.of("clearedState", "null"),
+ KV.of("bla", "25"),
+ KV.of("bla1", "42"),
+ KV.of("bla2", "64"),
+ KV.of("clearedState", "null"));
+
+ // This is line below required to convert the PAssert's read to an impulse, which is expected
+ // by the GreedyPipelineFuser.
+ p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
@@ -185,20 +191,9 @@ public class PortableStateExecutionTest implements Serializable {
Collections.emptyList());
jobInvocation.start();
- long timeout = System.currentTimeMillis() + 60 * 1000;
- while (jobInvocation.getState() != Enum.DONE && System.currentTimeMillis() < timeout) {
- Thread.sleep(1000);
- }
- assertThat(jobInvocation.getState(), is(Enum.DONE));
- Map<String, Integer> expected = new HashMap<>();
- expected.put("bla", 25);
- expected.put("bla1", 42);
- expected.put("bla2", 64);
- expected.put("clearedState", null);
-
- for (Map<String, Integer> statesValues : stateValuesMap.values()) {
- assertThat(statesValues, equalTo(expected));
+ while (jobInvocation.getState() != Enum.DONE) {
+ Thread.sleep(1000);
}
}
}
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
index 58db1ba..d9639b8 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
@@ -67,7 +67,7 @@ import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class PortableTimersExecutionTest implements Serializable {
- @Parameters
+ @Parameters(name = "streaming: {0}")
public static Object[] testModes() {
return new Object[] {true, false};
}