You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2018/12/21 19:03:11 UTC

[beam] branch master updated: [BEAM-6283] Convert PortableStateExecutionTest and PortableExecutionTest to using PAssert

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 845de99  [BEAM-6283] Convert PortableStateExecutionTest and PortableExecutionTest to using PAssert
845de99 is described below

commit 845de99bb3db353fc219333f736bd02bae65a5ad
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Fri Dec 21 20:03:03 2018 +0100

    [BEAM-6283] Convert PortableStateExecutionTest and PortableExecutionTest to using PAssert
    
    This fixes a race condition in the tests when writing results to a static map.
---
 .../beam/runners/flink/PortableExecutionTest.java  |  94 +++++-----
 .../runners/flink/PortableStateExecutionTest.java  | 193 ++++++++++-----------
 .../runners/flink/PortableTimersExecutionTest.java |   2 +-
 3 files changed, 136 insertions(+), 153 deletions(-)

diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
index 34985d7..9542bdd 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
@@ -17,19 +17,16 @@
  */
 package org.apache.beam.runners.flink;
 
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-
+import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.concurrent.Executors;
 import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.Environments;
+import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
@@ -39,12 +36,14 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PortablePipelineOptions;
 import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.Impulse;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -61,7 +60,7 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class PortableExecutionTest implements Serializable {
 
-  @Parameters
+  @Parameters(name = "streaming: {0}")
   public static Object[] data() {
     return new Object[] {true, false};
   }
@@ -80,9 +79,7 @@ public class PortableExecutionTest implements Serializable {
     flinkJobExecutor.shutdown();
   }
 
-  private static ArrayList<KV<String, Iterable<Long>>> outputValues = new ArrayList<>();
-
-  @Test
+  @Test(timeout = 120_000)
   public void testExecution() throws Exception {
     PipelineOptions options = PipelineOptionsFactory.create();
     options.setRunner(CrashingRunner.class);
@@ -92,45 +89,42 @@ public class PortableExecutionTest implements Serializable {
         .as(PortablePipelineOptions.class)
         .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
     Pipeline p = Pipeline.create(options);
-    p.apply("impulse", Impulse.create())
-        .apply(
-            "create",
-            ParDo.of(
-                new DoFn<byte[], String>() {
-                  @ProcessElement
-                  public void process(ProcessContext ctxt) {
-                    ctxt.output("zero");
-                    ctxt.output("one");
-                    ctxt.output("two");
-                  }
-                }))
-        .apply(
-            "len",
-            ParDo.of(
-                new DoFn<String, Long>() {
-                  @ProcessElement
-                  public void process(ProcessContext ctxt) {
-                    ctxt.output((long) ctxt.element().length());
-                  }
-                }))
-        .apply("addKeys", WithKeys.of("foo"))
-        // Use some unknown coders
-        .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianLongCoder.of()))
-        // Force the output to be materialized
-        .apply("gbk", GroupByKey.create())
-        .apply(
-            "collect",
-            ParDo.of(
-                new DoFn<KV<String, Iterable<Long>>, Void>() {
-                  @ProcessElement
-                  public void process(ProcessContext ctx) {
-                    outputValues.add(ctx.element());
-                  }
-                }));
+    PCollection<KV<String, Iterable<Long>>> result =
+        p.apply("impulse", Impulse.create())
+            .apply(
+                "create",
+                ParDo.of(
+                    new DoFn<byte[], String>() {
+                      @ProcessElement
+                      public void process(ProcessContext ctxt) {
+                        ctxt.output("zero");
+                        ctxt.output("one");
+                        ctxt.output("two");
+                      }
+                    }))
+            .apply(
+                "len",
+                ParDo.of(
+                    new DoFn<String, Long>() {
+                      @ProcessElement
+                      public void process(ProcessContext ctxt) {
+                        ctxt.output((long) ctxt.element().length());
+                      }
+                    }))
+            .apply("addKeys", WithKeys.of("foo"))
+            // Use some unknown coders
+            .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianLongCoder.of()))
+            // Force the output to be materialized
+            .apply("gbk", GroupByKey.create());
+
+    PAssert.that(result).containsInAnyOrder(KV.of("foo", ImmutableList.of(4L, 3L, 3L)));
+
+    // This is line below required to convert the PAssert's read to an impulse, which is expected
+    // by the GreedyPipelineFuser.
+    p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
 
     RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
 
-    outputValues.clear();
     // execute the pipeline
     FlinkJobInvocation jobInvocation =
         FlinkJobInvocation.create(
@@ -140,16 +134,10 @@ public class PortableExecutionTest implements Serializable {
             pipelineProto,
             options.as(FlinkPipelineOptions.class),
             null,
-            Collections.EMPTY_LIST);
+            Collections.emptyList());
     jobInvocation.start();
-    long timeout = System.currentTimeMillis() + 60 * 1000;
-    while (jobInvocation.getState() != Enum.DONE && System.currentTimeMillis() < timeout) {
+    while (jobInvocation.getState() != Enum.DONE) {
       Thread.sleep(1000);
     }
-    assertEquals("job state", Enum.DONE, jobInvocation.getState());
-
-    assertEquals(1, outputValues.size());
-    assertEquals("foo", outputValues.get(0).getKey());
-    assertThat(outputValues.get(0).getValue(), containsInAnyOrder(4L, 3L, 3L));
   }
 }
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
index 05194d1..a658a1c 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
@@ -17,20 +17,15 @@
  */
 package org.apache.beam.runners.flink;
 
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import java.io.Serializable;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.Executors;
 import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.Environments;
+import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -41,10 +36,12 @@ import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
 import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.testing.CrashingRunner;
+import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Impulse;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -60,7 +57,7 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class PortableStateExecutionTest implements Serializable {
 
-  @Parameters
+  @Parameters(name = "streaming: {0}")
   public static Object[] data() {
     return new Object[] {true, false};
   }
@@ -79,21 +76,11 @@ public class PortableStateExecutionTest implements Serializable {
     flinkJobExecutor.shutdown();
   }
 
-  // State -> Key -> Value
-  private static final Map<String, Map<String, Integer>> stateValuesMap = new HashMap<>();
-
-  @Before
-  public void before() {
-    stateValuesMap.clear();
-    stateValuesMap.put("valueState", new HashMap<>());
-    stateValuesMap.put("valueState2", new HashMap<>());
-  }
-
   // Special values which clear / write out state
   private static final int CLEAR_STATE = -1;
-  private static final int WRITE_STATE_TO_MAP = -2;
+  private static final int WRITE_STATE = -2;
 
-  @Test
+  @Test(timeout = 120_000)
   public void testExecution() throws Exception {
     PipelineOptions options = PipelineOptionsFactory.create();
     options.setRunner(CrashingRunner.class);
@@ -103,74 +90,93 @@ public class PortableStateExecutionTest implements Serializable {
         .as(PortablePipelineOptions.class)
         .setDefaultEnvironmentType(Environments.ENVIRONMENT_EMBEDDED);
     Pipeline p = Pipeline.create(options);
-    p.apply(Impulse.create())
-        .apply(
-            ParDo.of(
-                new DoFn<byte[], KV<String, Integer>>() {
-                  @ProcessElement
-                  public void process(ProcessContext ctx) {
-                    // Values == -1 will clear the state
-                    ctx.output(KV.of("clearedState", 1));
-                    ctx.output(KV.of("clearedState", CLEAR_STATE));
-                    // values >= 1 will be added on top of each other
-                    ctx.output(KV.of("bla1", 42));
-                    ctx.output(KV.of("bla", 23));
-                    ctx.output(KV.of("bla2", 64));
-                    ctx.output(KV.of("bla", 1));
-                    ctx.output(KV.of("bla", 1));
-                    // values == -2 will write the state to a map
-                    ctx.output(KV.of("bla", WRITE_STATE_TO_MAP));
-                    ctx.output(KV.of("bla1", WRITE_STATE_TO_MAP));
-                    ctx.output(KV.of("bla2", WRITE_STATE_TO_MAP));
-                    ctx.output(KV.of("clearedState", -2));
-                  }
-                }))
-        .apply(
-            "statefulDoFn",
-            ParDo.of(
-                new DoFn<KV<String, Integer>, String>() {
-                  @StateId("valueState")
-                  private final StateSpec<ValueState<Integer>> valueStateSpec =
-                      StateSpecs.value(VarIntCoder.of());
-
-                  @StateId("valueState2")
-                  private final StateSpec<ValueState<Integer>> valueStateSpec2 =
-                      StateSpecs.value(VarIntCoder.of());
-
-                  @ProcessElement
-                  public void process(
-                      ProcessContext ctx,
-                      @StateId("valueState") ValueState<Integer> valueState,
-                      @StateId("valueState2") ValueState<Integer> valueState2) {
-                    performStateUpdates("valueState", ctx, valueState);
-                    performStateUpdates("valueState2", ctx, valueState2);
-                  }
-
-                  private void performStateUpdates(
-                      String stateId, ProcessContext ctx, ValueState<Integer> valueState) {
-                    Map<String, Integer> stateValues = stateValuesMap.get(stateId);
-                    Integer value = ctx.element().getValue();
-                    if (value == null) {
-                      throw new IllegalStateException();
-                    }
-                    switch (value) {
-                      case CLEAR_STATE:
-                        valueState.clear();
-                        break;
-                      case WRITE_STATE_TO_MAP:
-                        stateValues.put(ctx.element().getKey(), valueState.read());
-                        break;
-                      default:
-                        Integer currentState = valueState.read();
-                        if (currentState == null) {
-                          currentState = value;
-                        } else {
-                          currentState += value;
+    PCollection<KV<String, String>> output =
+        p.apply(Impulse.create())
+            .apply(
+                ParDo.of(
+                    new DoFn<byte[], KV<String, Integer>>() {
+                      @ProcessElement
+                      public void process(ProcessContext ctx) {
+                        // Values == -1 will clear the state
+                        ctx.output(KV.of("clearedState", 1));
+                        ctx.output(KV.of("clearedState", CLEAR_STATE));
+                        // values >= 1 will be added on top of each other
+                        ctx.output(KV.of("bla1", 42));
+                        ctx.output(KV.of("bla", 23));
+                        ctx.output(KV.of("bla2", 64));
+                        ctx.output(KV.of("bla", 1));
+                        ctx.output(KV.of("bla", 1));
+                        // values == -2 will write the current state to the output
+                        ctx.output(KV.of("bla", WRITE_STATE));
+                        ctx.output(KV.of("bla1", WRITE_STATE));
+                        ctx.output(KV.of("bla2", WRITE_STATE));
+                        ctx.output(KV.of("clearedState", WRITE_STATE));
+                      }
+                    }))
+            .apply(
+                "statefulDoFn",
+                ParDo.of(
+                    new DoFn<KV<String, Integer>, KV<String, String>>() {
+                      @StateId("valueState")
+                      private final StateSpec<ValueState<Integer>> valueStateSpec =
+                          StateSpecs.value(VarIntCoder.of());
+
+                      @StateId("valueState2")
+                      private final StateSpec<ValueState<Integer>> valueStateSpec2 =
+                          StateSpecs.value(VarIntCoder.of());
+
+                      @ProcessElement
+                      public void process(
+                          ProcessContext ctx,
+                          @StateId("valueState") ValueState<Integer> valueState,
+                          @StateId("valueState2") ValueState<Integer> valueState2) {
+                        performStateUpdates(ctx, valueState);
+                        performStateUpdates(ctx, valueState2);
+                      }
+
+                      private void performStateUpdates(
+                          ProcessContext ctx, ValueState<Integer> valueState) {
+                        Integer value = ctx.element().getValue();
+                        if (value == null) {
+                          throw new IllegalStateException();
                         }
-                        valueState.write(currentState);
-                    }
-                  }
-                }));
+                        switch (value) {
+                          case CLEAR_STATE:
+                            valueState.clear();
+                            break;
+                          case WRITE_STATE:
+                            Integer read = valueState.read();
+                            ctx.output(
+                                KV.of(
+                                    ctx.element().getKey(),
+                                    read == null ? "null" : read.toString()));
+                            break;
+                          default:
+                            Integer currentState = valueState.read();
+                            if (currentState == null) {
+                              currentState = value;
+                            } else {
+                              currentState += value;
+                            }
+                            valueState.write(currentState);
+                        }
+                      }
+                    }));
+
+    PAssert.that(output)
+        .containsInAnyOrder(
+            KV.of("bla", "25"),
+            KV.of("bla1", "42"),
+            KV.of("bla2", "64"),
+            KV.of("clearedState", "null"),
+            KV.of("bla", "25"),
+            KV.of("bla1", "42"),
+            KV.of("bla2", "64"),
+            KV.of("clearedState", "null"));
+
+    // This is line below required to convert the PAssert's read to an impulse, which is expected
+    // by the GreedyPipelineFuser.
+    p.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
 
     RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
 
@@ -185,20 +191,9 @@ public class PortableStateExecutionTest implements Serializable {
             Collections.emptyList());
 
     jobInvocation.start();
-    long timeout = System.currentTimeMillis() + 60 * 1000;
-    while (jobInvocation.getState() != Enum.DONE && System.currentTimeMillis() < timeout) {
-      Thread.sleep(1000);
-    }
-    assertThat(jobInvocation.getState(), is(Enum.DONE));
 
-    Map<String, Integer> expected = new HashMap<>();
-    expected.put("bla", 25);
-    expected.put("bla1", 42);
-    expected.put("bla2", 64);
-    expected.put("clearedState", null);
-
-    for (Map<String, Integer> statesValues : stateValuesMap.values()) {
-      assertThat(statesValues, equalTo(expected));
+    while (jobInvocation.getState() != Enum.DONE) {
+      Thread.sleep(1000);
     }
   }
 }
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
index 58db1ba..d9639b8 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
@@ -67,7 +67,7 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class PortableTimersExecutionTest implements Serializable {
 
-  @Parameters
+  @Parameters(name = "streaming: {0}")
   public static Object[] testModes() {
     return new Object[] {true, false};
   }