You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/01/22 20:07:04 UTC

[flink] branch master updated: [FLINK-21050][examples-table] Add some advanced function examples

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c696f03  [FLINK-21050][examples-table] Add some advanced function examples
c696f03 is described below

commit c696f0361c8196e4015922ae5228aeb93ec74e4d
Author: Timo Walther <tw...@apache.org>
AuthorDate: Thu Jan 21 10:52:22 2021 +0100

    [FLINK-21050][examples-table] Add some advanced function examples
    
    This closes #14714.
---
 docs/dev/table/functions/udfs.md                   |   3 +
 docs/dev/table/functions/udfs.zh.md                |   3 +
 flink-examples/flink-examples-table/pom.xml        |  24 ++++
 .../java/functions/AdvancedFunctionsExample.java   | 125 +++++++++++++++++++
 .../java/functions/InternalRowMergerFunction.java  | 134 +++++++++++++++++++++
 .../java/functions/LastDatedValueFunction.java     | 132 ++++++++++++++++++++
 .../functions/AdvancedFunctionsExampleITCase.java  |  81 +++++++++++++
 7 files changed, 502 insertions(+)

diff --git a/docs/dev/table/functions/udfs.md b/docs/dev/table/functions/udfs.md
index 7455233..cbbb87d 100644
--- a/docs/dev/table/functions/udfs.md
+++ b/docs/dev/table/functions/udfs.md
@@ -551,6 +551,9 @@ public static class LiteralFunction extends ScalarFunction {
 
 </div>
 
+For more examples of custom type inference, see also the `flink-examples-table` module with
+{% gh_link flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExample.java "advanced function implementation" %}.
+
 ### Determinism
 
 Every user-defined function class can declare whether it produces deterministic results or not by overriding
diff --git a/docs/dev/table/functions/udfs.zh.md b/docs/dev/table/functions/udfs.zh.md
index c470468..cb1a416 100644
--- a/docs/dev/table/functions/udfs.zh.md
+++ b/docs/dev/table/functions/udfs.zh.md
@@ -537,6 +537,9 @@ public static class LiteralFunction extends ScalarFunction {
 
 </div>
 
+For more examples of custom type inference, see also the `flink-examples-table` module with
+{% gh_link flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExample.java "advanced function implementation" %}.
+
 ### 运行时集成
 -------------------
 
diff --git a/flink-examples/flink-examples-table/pom.xml b/flink-examples/flink-examples-table/pom.xml
index 52b2ab3..b9e6586 100644
--- a/flink-examples/flink-examples-table/pom.xml
+++ b/flink-examples/flink-examples-table/pom.xml
@@ -283,6 +283,30 @@ under the License.
 					</execution>
 
 					<execution>
+						<id>AdvancedFunctionsExample</id>
+						<phase>package</phase>
+						<goals>
+							<goal>jar</goal>
+						</goals>
+
+						<configuration>
+							<classifier>AdvancedFunctionsExample</classifier>
+
+							<archive>
+								<manifestEntries>
+									<program-class>org.apache.flink.table.examples.java.functions.AdvancedFunctionsExample</program-class>
+								</manifestEntries>
+							</archive>
+
+							<includes>
+								<include>org/apache/flink/table/examples/java/functions/*</include>
+								<include>META-INF/LICENSE</include>
+								<include>META-INF/NOTICE</include>
+							</includes>
+						</configuration>
+					</execution>
+
+					<execution>
 						<id>StreamTableExample</id>
 						<phase>package</phase>
 						<goals>
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExample.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExample.java
new file mode 100644
index 0000000..4a734a5
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExample.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.functions;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.types.Row;
+
+import java.time.LocalDate;
+
+/**
+ * Example for implementing more complex {@link UserDefinedFunction}s.
+ *
+ * <p>In many use cases, function signatures can be reflectively extracted from a UDF class. The
+ * annotations {@link DataTypeHint} and {@link FunctionHint} help if reflective information is not
+ * enough and needs to be enriched with further logical details. Check the website documentation as
+ * well as the docs of {@link ScalarFunction}, {@link TableFunction}, and {@link AggregateFunction}
+ * for more information.
+ *
+ * <p>Both reflective extraction and annotations are suitable for function signatures with fixed
+ * input and output types. However, for advanced use cases it might be required to derive an output
+ * type from one of the argument types or perform stricter validation.
+ *
+ * <p>This example demonstrates various UDF implementations. We are executing multiple Flink jobs
+ * where the result is written to stdout.
+ */
+public class AdvancedFunctionsExample {
+
+    public static void main(String[] args) throws Exception {
+        // setup the environment
+        final EnvironmentSettings settings =
+                EnvironmentSettings.newInstance().inBatchMode().build();
+        final TableEnvironment env = TableEnvironment.create(settings);
+
+        // execute different kinds of functions
+        executeLastDatedValueFunction(env);
+        executeInternalRowMergerFunction(env);
+    }
+
+    /**
+     * Aggregates data by name and returns the latest non-null {@code item_count} value with its
+     * corresponding {@code order_date}.
+     */
+    private static void executeLastDatedValueFunction(TableEnvironment env) {
+        // create a table with example data
+        final Table customers =
+                env.fromValues(
+                        DataTypes.of("ROW<name STRING, order_date DATE, item_count INT>"),
+                        Row.of("Guillermo Smith", LocalDate.parse("2020-12-01"), 3),
+                        Row.of("Guillermo Smith", LocalDate.parse("2020-12-05"), 5),
+                        Row.of("Valeria Mendoza", LocalDate.parse("2020-03-23"), 4),
+                        Row.of("Valeria Mendoza", LocalDate.parse("2020-06-02"), 10),
+                        Row.of("Leann Holloway", LocalDate.parse("2020-05-26"), 9),
+                        Row.of("Leann Holloway", LocalDate.parse("2020-05-27"), null),
+                        Row.of("Brandy Sanders", LocalDate.parse("2020-10-14"), 1),
+                        Row.of("John Turner", LocalDate.parse("2020-10-02"), 12),
+                        Row.of("Ellen Ortega", LocalDate.parse("2020-06-18"), 100));
+        env.createTemporaryView("customers", customers);
+
+        // register and execute the function
+        env.createTemporarySystemFunction("LastDatedValueFunction", LastDatedValueFunction.class);
+        env.executeSql(
+                        "SELECT name, LastDatedValueFunction(item_count, order_date) "
+                                + "FROM customers GROUP BY name")
+                .print();
+
+        // clean up
+        env.dropTemporaryView("customers");
+    }
+
+    /** Merges two rows as efficient as possible using internal data structures. */
+    private static void executeInternalRowMergerFunction(TableEnvironment env) {
+        // create a table with example data
+        final Table customers =
+                env.fromValues(
+                        DataTypes.of(
+                                "ROW<name STRING, data1 ROW<birth_date DATE>, data2 ROW<city STRING, phone STRING>>"),
+                        Row.of(
+                                "Guillermo Smith",
+                                Row.of(LocalDate.parse("1992-12-12")),
+                                Row.of("New Jersey", "816-443-8010")),
+                        Row.of(
+                                "Valeria Mendoza",
+                                Row.of(LocalDate.parse("1970-03-28")),
+                                Row.of("Los Angeles", "928-264-9662")),
+                        Row.of(
+                                "Leann Holloway",
+                                Row.of(LocalDate.parse("1989-05-21")),
+                                Row.of("Eugene", "614-889-6038")));
+        env.createTemporaryView("customers", customers);
+
+        // register and execute the function
+        env.createTemporarySystemFunction(
+                "InternalRowMergerFunction", InternalRowMergerFunction.class);
+        env.executeSql("SELECT name, InternalRowMergerFunction(data1, data2) FROM customers")
+                .print();
+
+        // clean up
+        env.dropTemporaryView("customers");
+    }
+}
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/InternalRowMergerFunction.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/InternalRowMergerFunction.java
new file mode 100644
index 0000000..f0cfe4f
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/InternalRowMergerFunction.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.functions;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.Signature.Argument;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+/**
+ * Merges two rows into a single row with unique field names.
+ *
+ * <p>The function uses a custom {@link TypeInference} and thus disables any of the default
+ * reflection-based logic. It returns internal data structures for both input and output types of
+ * the {@code eval} method.
+ *
+ * <p>For code readability, we might use some internal utility methods that should rarely change.
+ * Implementers can copy those if they don't want to rely on non-official API.
+ */
+public final class InternalRowMergerFunction extends ScalarFunction {
+
+    // --------------------------------------------------------------------------------------------
+    // Planning
+    // --------------------------------------------------------------------------------------------
+
+    @Override
+    public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+        return TypeInference.newBuilder()
+                // accept a signature (ROW, ROW) with arbitrary field types but
+                // with internal conversion classes
+                .inputTypeStrategy(
+                        new InputTypeStrategy() {
+                            @Override
+                            public ArgumentCount getArgumentCount() {
+                                // the argument count is checked before input types are inferred
+                                return ConstantArgumentCount.of(2);
+                            }
+
+                            @Override
+                            public Optional<List<DataType>> inferInputTypes(
+                                    CallContext callContext, boolean throwOnFailure) {
+                                final List<DataType> args = callContext.getArgumentDataTypes();
+                                final DataType arg0 = args.get(0);
+                                final DataType arg1 = args.get(1);
+                                // perform some basic validation based on the logical type
+                                if (arg0.getLogicalType().getTypeRoot() != LogicalTypeRoot.ROW
+                                        || arg1.getLogicalType().getTypeRoot()
+                                                != LogicalTypeRoot.ROW) {
+                                    if (throwOnFailure) {
+                                        throw callContext.newValidationError(
+                                                "Two row arguments expected.");
+                                    }
+                                    return Optional.empty();
+                                }
+                                // keep the original logical type but express that both arguments
+                                // should use internal data structures
+                                return Optional.of(
+                                        Arrays.asList(
+                                                arg0.bridgedTo(RowData.class),
+                                                arg1.bridgedTo(RowData.class)));
+                            }
+
+                            @Override
+                            public List<Signature> getExpectedSignatures(
+                                    FunctionDefinition definition) {
+                                // this helps in printing nice error messages
+                                return Collections.singletonList(
+                                        Signature.of(Argument.of("ROW"), Argument.of("ROW")));
+                            }
+                        })
+                .outputTypeStrategy(
+                        callContext -> {
+                            // merge fields and give them a unique name
+                            final List<DataType> args = callContext.getArgumentDataTypes();
+                            final List<DataType> allFieldDataTypes = new ArrayList<>();
+                            allFieldDataTypes.addAll(args.get(0).getChildren());
+                            allFieldDataTypes.addAll(args.get(1).getChildren());
+                            final DataTypes.Field[] fields =
+                                    IntStream.range(0, allFieldDataTypes.size())
+                                            .mapToObj(
+                                                    i ->
+                                                            DataTypes.FIELD(
+                                                                    "f" + i,
+                                                                    allFieldDataTypes.get(i)))
+                                            .toArray(DataTypes.Field[]::new);
+                            // create a new row with the merged fields and express that the return
+                            // type will use an internal data structure
+                            return Optional.of(DataTypes.ROW(fields).bridgedTo(RowData.class));
+                        })
+                .build();
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Runtime
+    // --------------------------------------------------------------------------------------------
+
+    public RowData eval(RowData r1, RowData r2) {
+        return new JoinedRowData(r1, r2);
+    }
+}
diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/LastDatedValueFunction.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/LastDatedValueFunction.java
new file mode 100644
index 0000000..641fecb
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/functions/LastDatedValueFunction.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.functions;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.DataTypeFactory;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.InputTypeStrategies;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.types.Row;
+
+import java.time.LocalDate;
+import java.util.Optional;
+
+/**
+ * Implementation of an {@link AggregateFunction} that returns a row containing the latest non-null
+ * value with its corresponding date.
+ *
+ * <p>The function uses a custom {@link TypeInference} and thus disables any of the default
+ * reflection-based logic. It has a generic parameter {@code T} which will result in {@link Object}
+ * (due to type erasure) during runtime. The {@link TypeInference} will provide the necessary
+ * information how to call {@code accumulate(...)} for the given call in the query.
+ *
+ * <p>For code readability, we might use some internal utility methods that should rarely change.
+ * Implementers can copy those if they don't want to rely on non-official API.
+ *
+ * @param <T> input value
+ */
+public final class LastDatedValueFunction<T>
+        extends AggregateFunction<Row, LastDatedValueFunction.Accumulator<T>> {
+
+    // --------------------------------------------------------------------------------------------
+    // Planning
+    // --------------------------------------------------------------------------------------------
+
+    /**
+     * Declares the {@link TypeInference} of this function. It specifies:
+     *
+     * <ul>
+     *   <li>which argument types are supported when calling this function,
+     *   <li>which {@link DataType#getConversionClass()} should be used when calling the JVM method
+     *       {@link #accumulate(Accumulator, Object, LocalDate)} during runtime,
+     *   <li>a similar strategy how to derive an accumulator type,
+     *   <li>and a similar strategy how to derive the output type.
+     * </ul>
+     */
+    @Override
+    public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+        return TypeInference.newBuilder()
+                // accept a signature (ANY, DATE) both with default conversion classes,
+                // the input type strategy is mostly used to produce nicer validation exceptions
+                // during planning, implementers can decide to skip it if they are fine with failing
+                // at a later stage during code generation when the runtime method is checked
+                .inputTypeStrategy(
+                        InputTypeStrategies.sequence(
+                                InputTypeStrategies.ANY,
+                                InputTypeStrategies.explicit(DataTypes.DATE())))
+                // let the accumulator data type depend on the first input argument
+                .accumulatorTypeStrategy(
+                        callContext -> {
+                            final DataType argDataType = callContext.getArgumentDataTypes().get(0);
+                            final DataType accDataType =
+                                    DataTypes.STRUCTURED(
+                                            Accumulator.class,
+                                            DataTypes.FIELD("value", argDataType),
+                                            DataTypes.FIELD("date", DataTypes.DATE()));
+                            return Optional.of(accDataType);
+                        })
+                // let the output data type depend on the first input argument
+                .outputTypeStrategy(
+                        callContext -> {
+                            final DataType argDataType = callContext.getArgumentDataTypes().get(0);
+                            final DataType outputDataType =
+                                    DataTypes.ROW(
+                                            DataTypes.FIELD("value", argDataType),
+                                            DataTypes.FIELD("date", DataTypes.DATE()));
+                            return Optional.of(outputDataType);
+                        })
+                .build();
+    }
+
+    // --------------------------------------------------------------------------------------------
+    // Runtime
+    // --------------------------------------------------------------------------------------------
+
+    /**
+     * Generic accumulator for representing state. It will contain different kind of instances for
+     * {@code value} depending on actual call in the query.
+     */
+    public static class Accumulator<T> {
+        public T value;
+        public LocalDate date;
+    }
+
+    @Override
+    public Accumulator<T> createAccumulator() {
+        return new Accumulator<>();
+    }
+
+    /**
+     * Generic runtime function that will be called with different kind of instances for {@code
+     * input} depending on actual call in the query.
+     */
+    public void accumulate(Accumulator<T> acc, T input, LocalDate date) {
+        if (input != null && (acc.date == null || date.isAfter(acc.date))) {
+            acc.value = input;
+            acc.date = date;
+        }
+    }
+
+    @Override
+    public Row getValue(Accumulator<T> acc) {
+        return Row.of(acc.value, acc.date);
+    }
+}
diff --git a/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java b/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java
new file mode 100644
index 0000000..35fe1ed
--- /dev/null
+++ b/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/java/functions/AdvancedFunctionsExampleITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.examples.java.functions;
+
+import org.apache.flink.table.examples.utils.ExampleOutputTestBase;
+
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertThat;
+
+/** Test for Java {@link AdvancedFunctionsExample}. */
+public class AdvancedFunctionsExampleITCase extends ExampleOutputTestBase {
+
+    @Test
+    public void testExample() throws Exception {
+        AdvancedFunctionsExample.main(new String[0]);
+        final String consoleOutput = getOutputString();
+
+        testExecuteLastDatedValueFunction(consoleOutput);
+        testExecuteInternalRowMergerFunction(consoleOutput);
+    }
+
+    private void testExecuteLastDatedValueFunction(String consoleOutput) {
+        assertThat(
+                consoleOutput,
+                containsString(
+                        "|                Guillermo Smith |              +I[5, 2020-12-05] |"));
+        assertThat(
+                consoleOutput,
+                containsString(
+                        "|                    John Turner |             +I[12, 2020-10-02] |"));
+        assertThat(
+                consoleOutput,
+                containsString(
+                        "|                 Brandy Sanders |              +I[1, 2020-10-14] |"));
+        assertThat(
+                consoleOutput,
+                containsString(
+                        "|                Valeria Mendoza |             +I[10, 2020-06-02] |"));
+        assertThat(
+                consoleOutput,
+                containsString(
+                        "|                   Ellen Ortega |            +I[100, 2020-06-18] |"));
+        assertThat(
+                consoleOutput,
+                containsString(
+                        "|                 Leann Holloway |              +I[9, 2020-05-26] |"));
+    }
+
+    private void testExecuteInternalRowMergerFunction(String consoleOutput) {
+        assertThat(
+                consoleOutput,
+                containsString(
+                        "|                Guillermo Smith | +I[1992-12-12, New Jersey, ... |"));
+        assertThat(
+                consoleOutput,
+                containsString(
+                        "|                Valeria Mendoza | +I[1970-03-28, Los Angeles,... |"));
+        assertThat(
+                consoleOutput,
+                containsString(
+                        "|                 Leann Holloway | +I[1989-05-21, Eugene, 614-... |"));
+    }
+}