You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/01/23 22:33:21 UTC
[kudu] branch master updated (5b1d17f -> 9443a69)
This is an automated email from the ASF dual-hosted git repository.
adar pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.
from 5b1d17f python: fix Python 3.4 based tests
new 9ea5433 [spark] Add metrics to kudu-spark
new 9443a69 [java] log warning if applying operation on a closed session
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../org/apache/kudu/client/AsyncKuduClient.java | 2 +-
.../org/apache/kudu/client/AsyncKuduSession.java | 5 +++
.../org/apache/kudu/client/TestKuduClient.java | 27 ++++++++++++-
.../org/apache/kudu/spark/kudu/DefaultSource.scala | 6 +--
.../org/apache/kudu/spark/kudu/KuduContext.scala | 24 ++++++++++-
.../scala/org/apache/kudu/spark/kudu/KuduRDD.scala | 14 ++++++-
.../apache/kudu/spark/kudu/DefaultSourceTest.scala | 47 +++++++++++++++-------
.../org/apache/kudu/spark/kudu/KuduRDDTest.scala | 4 +-
8 files changed, 104 insertions(+), 25 deletions(-)
[kudu] 01/02: [spark] Add metrics to kudu-spark
Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 9ea54330645af044817894dbf1c0d5b8a5185822
Author: Will Berkeley <wd...@gmail.org>
AuthorDate: Fri Jan 18 17:00:26 2019 -0800
[spark] Add metrics to kudu-spark
This adds some basic accumulator metrics to kudu-spark. These
accumulators appear for each stage involving a KuduContext, with values
broken down by executor.
Follow-ups will add some more sophisticated metrics and use metrics to
add additional logging.
In addition to the unit tests, I tested this on a 3-node cluster and
confirmed I was able to see the accumulator values for stages and
individual tasks on the Spark application's web UI, and that were
correct.
Change-Id: Ied81c890b3d4510f767d8f023963ff878f398140
Reviewed-on: http://gerrit.cloudera.org:8080/12251
Reviewed-by: Grant Henke <gr...@apache.org>
Tested-by: Will Berkeley <wd...@gmail.com>
---
.../org/apache/kudu/client/AsyncKuduClient.java | 2 +-
.../org/apache/kudu/spark/kudu/DefaultSource.scala | 6 +--
.../org/apache/kudu/spark/kudu/KuduContext.scala | 24 ++++++++++-
.../scala/org/apache/kudu/spark/kudu/KuduRDD.scala | 14 ++++++-
.../apache/kudu/spark/kudu/DefaultSourceTest.scala | 47 +++++++++++++++-------
.../org/apache/kudu/spark/kudu/KuduRDDTest.scala | 4 +-
6 files changed, 74 insertions(+), 23 deletions(-)
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 4ed25f6..9b5b280 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -432,7 +432,7 @@ public class AsyncKuduClient implements AutoCloseable {
new ServerInfo(getFakeMasterUuid(hostPort),
hostPort,
inetAddress,
- /*locaton=*/""),
+ /* location= */""),
credentialsPolicy);
}
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
index 7729eb9..388fac1 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/DefaultSource.scala
@@ -122,7 +122,7 @@ class DefaultSource
* @param sqlContext
* @param mode Only Append mode is supported. It will upsert or insert data
* to an existing table, depending on the upsert parameter
- * @param parameters Necessary parameters for kudu.table, kudu.master, etc..
+ * @param parameters Necessary parameters for kudu.table, kudu.master, etc...
* @param data Dataframe to save into kudu
* @return returns populated base relation
*/
@@ -401,10 +401,10 @@ class KuduRelation(
}
/**
- * Creates a new `IS NULL` predicate for the column.
+ * Creates a new `IS NOT NULL` predicate for the column.
*
* @param column the column name
- * @return the `IS NULL` predicate
+ * @return the `IS NOT NULL` predicate
*/
private def isNotNullPredicate(column: String): KuduPredicate = {
KuduPredicate.newIsNotNullPredicate(table.getSchema.getColumn(column))
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
index 5a12ad6..d6295f9 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduContext.scala
@@ -19,7 +19,6 @@ package org.apache.kudu.spark.kudu
import java.security.AccessController
import java.security.PrivilegedAction
-
import javax.security.auth.Subject
import javax.security.auth.login.AppConfigurationEntry
import javax.security.auth.login.Configuration
@@ -27,6 +26,7 @@ import javax.security.auth.login.LoginContext
import scala.collection.JavaConverters._
import scala.collection.mutable
+
import org.apache.hadoop.util.ShutdownHookManager
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
@@ -39,6 +39,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.util.AccumulatorV2
+import org.apache.spark.util.LongAccumulator
import org.apache.yetus.audience.InterfaceAudience
import org.apache.yetus.audience.InterfaceStability
import org.slf4j.Logger
@@ -65,6 +66,24 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
def this(kuduMaster: String, sc: SparkContext) = this(kuduMaster, sc, None)
/**
+ * A collection of accumulator metrics describing the usage of a KuduContext.
+ */
+ private[kudu] val numInserts: LongAccumulator = sc.longAccumulator("kudu.num_inserts")
+ private[kudu] val numUpserts: LongAccumulator = sc.longAccumulator("kudu.num_upserts")
+ private[kudu] val numUpdates: LongAccumulator = sc.longAccumulator("kudu.num_updates")
+ private[kudu] val numDeletes: LongAccumulator = sc.longAccumulator("kudu.num_deletes")
+
+ // Increments the appropriate metric given an OperationType and a count.
+ private def addForOperation(count: Long, opType: OperationType): Unit = {
+ opType match {
+ case Insert => numInserts.add(count)
+ case Upsert => numUpserts.add(count)
+ case Update => numUpdates.add(count)
+ case Delete => numDeletes.add(count)
+ }
+ }
+
+ /**
* TimestampAccumulator accumulates the maximum value of client's
* propagated timestamp of all executors and can only read by the
* driver.
@@ -333,6 +352,7 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
session.setFlushMode(FlushMode.AUTO_FLUSH_BACKGROUND)
session.setIgnoreAllDuplicateRows(writeOptions.ignoreDuplicateRowErrors)
val typeConverter = CatalystTypeConverters.createToScalaConverter(schema)
+ var numRows = 0
try {
for (internalRow <- rows) {
val row = typeConverter(internalRow).asInstanceOf[Row]
@@ -377,12 +397,14 @@ class KuduContext(val kuduMaster: String, sc: SparkContext, val socketReadTimeou
}
}
session.apply(operation)
+ numRows += 1
}
} finally {
session.close()
// Update timestampAccumulator with the client's last propagated
// timestamp on each executor.
timestampAccumulator.add(syncClient.getLastPropagatedTimestamp)
+ addForOperation(numRows, operationType)
}
session.getPendingErrors
}
diff --git a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index 2deea16..356c290 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -17,13 +17,16 @@
package org.apache.kudu.spark.kudu
import scala.collection.JavaConverters._
+
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.Partition
import org.apache.spark.SparkContext
import org.apache.spark.TaskContext
+import org.apache.spark.util.LongAccumulator
import org.apache.yetus.audience.InterfaceAudience
import org.apache.yetus.audience.InterfaceStability
+
import org.apache.kudu.client._
import org.apache.kudu.Type
import org.apache.kudu.client
@@ -47,6 +50,10 @@ class KuduRDD private[kudu] (
// Defined here because the options are transient.
private val keepAlivePeriodMs = options.keepAlivePeriodMs
+ // A metric for the rows read from Kudu for this RDD.
+ // TODO(wdberkeley): Add bytes read if it becomes available from the Java client.
+ private[kudu] val rowsRead = sc.longAccumulator("kudu.rows_read")
+
override protected def getPartitions: Array[Partition] = {
val builder = kuduContext.syncClient
.newScanTokenBuilder(table)
@@ -93,7 +100,7 @@ class KuduRDD private[kudu] (
val partition: KuduPartition = part.asInstanceOf[KuduPartition]
val scanner =
KuduScanToken.deserializeIntoScanner(partition.scanToken, client)
- new RowIterator(scanner, kuduContext, keepAlivePeriodMs)
+ new RowIterator(scanner, kuduContext, keepAlivePeriodMs, rowsRead)
}
override def getPreferredLocations(partition: Partition): Seq[String] = {
@@ -115,11 +122,13 @@ private class KuduPartition(
* @param scanner the wrapped scanner
* @param kuduContext the kudu context
* @param keepAlivePeriodMs the period in which to call the keepAlive on the scanners
+ * @param rowsRead an accumulator to track the number of rows read from Kudu
*/
private class RowIterator(
val scanner: KuduScanner,
val kuduContext: KuduContext,
- val keepAlivePeriodMs: Long)
+ val keepAlivePeriodMs: Long,
+ val rowsRead: LongAccumulator)
extends Iterator[Row] {
private var currentIterator: RowResultIterator = RowResultIterator.empty
@@ -145,6 +154,7 @@ private class RowIterator(
// Update timestampAccumulator with the client's last propagated
// timestamp on each executor.
kuduContext.timestampAccumulator.add(kuduContext.syncClient.getLastPropagatedTimestamp)
+ rowsRead.add(currentIterator.getNumRows)
}
KeepKuduScannerAlive()
currentIterator.hasNext
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
index fe56e75..ad2c110 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/DefaultSourceTest.scala
@@ -89,9 +89,11 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
new CreateTableOptions()
.setRangePartitionColumns(List("key").asJava)
.setNumReplicas(1))
+ val insertsBefore = kuduContext.numInserts.value
kuduContext.insertRows(df, tableName)
+ assertEquals(insertsBefore + df.count(), kuduContext.numInserts.value)
- // now use new options to refer to the new table name
+ // Now use new options to refer to the new table name.
val newOptions: Map[String, String] =
Map("kudu.table" -> tableName, "kudu.master" -> harness.getMasterAddressesAsString)
val checkDf = sqlContext.read.options(newOptions).format("kudu").load
@@ -127,7 +129,9 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
.addRangePartition(lower, upper)
.setNumReplicas(1)
)
+ val insertsBefore = kuduContext.numInserts.value
kuduContext.insertRows(df, tableName)
+ assertEquals(insertsBefore + df.count(), kuduContext.numInserts.value)
// now use new options to refer to the new table name
val newOptions: Map[String, String] =
@@ -144,12 +148,14 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
@Test
def testInsertion() {
+ val insertsBefore = kuduContext.numInserts.value
val df = sqlContext.read.options(kuduOptions).format("kudu").load
val changedDF = df
.limit(1)
.withColumn("key", df("key").plus(100))
.withColumn("c2_s", lit("abc"))
kuduContext.insertRows(changedDF, tableName)
+ assertEquals(insertsBefore + changedDF.count(), kuduContext.numInserts.value)
val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
val collected = newDF.filter("key = 100").collect()
@@ -160,12 +166,14 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
@Test
def testInsertionMultiple() {
+ val insertsBefore = kuduContext.numInserts.value
val df = sqlContext.read.options(kuduOptions).format("kudu").load
val changedDF = df
.limit(2)
.withColumn("key", df("key").plus(100))
.withColumn("c2_s", lit("abc"))
kuduContext.insertRows(changedDF, tableName)
+ assertEquals(insertsBefore + changedDF.count(), kuduContext.numInserts.value)
val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
val collected = newDF.filter("key = 100").collect()
@@ -181,28 +189,28 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
@Test
def testInsertionIgnoreRows() {
val df = sqlContext.read.options(kuduOptions).format("kudu").load
- val baseDF = df.limit(1) // filter down to just the first row
+ val baseDF = df.limit(1) // Filter down to just the first row.
// change the c2 string to abc and insert
val updateDF = baseDF.withColumn("c2_s", lit("abc"))
val kuduWriteOptions = KuduWriteOptions(ignoreDuplicateRowErrors = true)
kuduContext.insertRows(updateDF, tableName, kuduWriteOptions)
- // change the key and insert
+ // Change the key and insert.
val insertDF = df
.limit(1)
.withColumn("key", df("key").plus(100))
.withColumn("c2_s", lit("def"))
kuduContext.insertRows(insertDF, tableName, kuduWriteOptions)
- // read the data back
+ // Read the data back.
val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
val collectedUpdate = newDF.filter("key = 0").collect()
assertEquals("0", collectedUpdate(0).getAs[String]("c2_s"))
val collectedInsert = newDF.filter("key = 100").collect()
assertEquals("def", collectedInsert(0).getAs[String]("c2_s"))
- // restore the original state of the table
+ // Restore the original state of the table.
deleteRow(100)
}
@@ -299,28 +307,34 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
@Test
def testUpsertRows() {
val df = sqlContext.read.options(kuduOptions).format("kudu").load
- val baseDF = df.limit(1) // filter down to just the first row
+ val baseDF = df.limit(1) // Filter down to just the first row.
- // change the c2 string to abc and update
- val updateDF = baseDF.withColumn("c2_s", lit("abc"))
- kuduContext.upsertRows(updateDF, tableName)
+ // Change the c2 string to abc and update.
+ val upsertDF = baseDF.withColumn("c2_s", lit("abc"))
+ kuduContext.upsertRows(upsertDF, tableName)
- // change the key and insert
+ // Change the key and insert.
+ val upsertsBefore = kuduContext.numUpserts.value
val insertDF = df
.limit(1)
.withColumn("key", df("key").plus(100))
.withColumn("c2_s", lit("def"))
kuduContext.upsertRows(insertDF, tableName)
+ assertEquals(upsertsBefore + insertDF.count(), kuduContext.numUpserts.value)
- // read the data back
+ // Read the data back.
val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
val collectedUpdate = newDF.filter("key = 0").collect()
assertEquals("abc", collectedUpdate(0).getAs[String]("c2_s"))
val collectedInsert = newDF.filter("key = 100").collect()
assertEquals("def", collectedInsert(0).getAs[String]("c2_s"))
- // restore the original state of the table
- kuduContext.updateRows(baseDF.filter("key = 0").withColumn("c2_s", lit("0")), tableName)
+ // Restore the original state of the table, and test the numUpdates metric.
+ val updatesBefore = kuduContext.numUpdates.value
+ val updateDF = baseDF.filter("key = 0").withColumn("c2_s", lit("0"))
+ val updatesApplied = updateDF.count()
+ kuduContext.updateRows(updateDF, tableName)
+ assertEquals(updatesBefore + updatesApplied, kuduContext.numUpdates.value)
deleteRow(100)
}
@@ -392,14 +406,17 @@ class DefaultSourceTest extends KuduTestSuite with Matchers {
def testDeleteRows() {
val df = sqlContext.read.options(kuduOptions).format("kudu").load
val deleteDF = df.filter("key = 0").select("key")
+ val deletesBefore = kuduContext.numDeletes.value
+ val deletesApplied = deleteDF.count()
kuduContext.deleteRows(deleteDF, tableName)
+ assertEquals(deletesBefore + deletesApplied, kuduContext.numDeletes.value)
- // read the data back
+ // Read the data back.
val newDF = sqlContext.read.options(kuduOptions).format("kudu").load
val collectedDelete = newDF.filter("key = 0").collect()
assertEquals(0, collectedDelete.length)
- // restore the original state of the table
+ // Restore the original state of the table.
val insertDF = df.limit(1).filter("key = 0")
kuduContext.insertRows(insertDF, tableName)
}
diff --git a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
index 49bc15e..1904f95 100644
--- a/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
+++ b/java/kudu-spark/src/test/scala/org/apache/kudu/spark/kudu/KuduRDDTest.scala
@@ -26,6 +26,7 @@ import org.apache.kudu.test.KuduTestHarness.TabletServerConfig
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
+import org.junit.Assert.assertEquals
import org.junit.Test
class KuduRDDTest extends KuduTestSuite {
@@ -34,7 +35,8 @@ class KuduRDDTest extends KuduTestSuite {
def testCollectRows() {
insertRows(table, 100)
val rdd = kuduContext.kuduRDD(ss.sparkContext, tableName, List("key"))
- assert(rdd.collect.length == 100)
+ assertEquals(100, rdd.collect().length)
+ assertEquals(100L, rdd.asInstanceOf[KuduRDD].rowsRead.value)
}
@Test
[kudu] 02/02: [java] log warning if applying operation on a closed
session
Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 9443a69b0d81c65a174874661e90a13e5792d55b
Author: abc549825 <ab...@163.com>
AuthorDate: Wed Jan 9 19:23:37 2019 +0800
[java] log warning if applying operation on a closed session
Closed sessions won't flush on another close() call, so it's somewhat
dubious to apply() an operation to a closed session. This change adds a
warning if apply() is used with a closed session.
Originally this patch enforced the new behavior via precondition, but that's
a breaking change for clients that relied on the old broken behavior.
Change-Id: I0fe221fedbb91959985f5ee374f1b691be2426a9
Reviewed-on: http://gerrit.cloudera.org:8080/12237
Tested-by: Kudu Jenkins
Reviewed-by: Grant Henke <gr...@apache.org>
---
.../org/apache/kudu/client/AsyncKuduSession.java | 5 ++++
.../org/apache/kudu/client/TestKuduClient.java | 27 ++++++++++++++++++++--
2 files changed, 30 insertions(+), 2 deletions(-)
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
index 94ca290..7d649bd 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduSession.java
@@ -532,6 +532,11 @@ public class AsyncKuduSession implements SessionConfiguration {
Preconditions.checkArgument(operation.getTable().getAsyncClient() == client,
"Applied operations must be created from a KuduTable instance opened " +
"from the same client that opened this KuduSession");
+ if (closed) {
+ // Ideally this would be a precondition, but that may break existing
+ // clients who have grown to rely on this unsafe behavior.
+ LOG.warn("Applying an operation in a closed session; this is unsafe");
+ }
// Freeze the row so that the client can not concurrently modify it while it is in flight.
operation.getRow().freeze();
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 97df3e0..4584890 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -1071,7 +1071,6 @@ public class TestKuduClient {
session.apply(insert);
}
session.flush();
- session.close();
// Perform a bunch of READ_YOUR_WRITES scans to all the replicas
// that count the rows. And verify that the count of the rows
@@ -1114,7 +1113,7 @@ public class TestKuduClient {
future.get();
}
}
-
+
private void runTestCallDuringLeaderElection(String clientMethodName) throws Exception {
// This bit of reflection helps us avoid duplicating test code.
Method methodToInvoke = KuduClient.class.getMethod(clientMethodName);
@@ -1138,6 +1137,7 @@ public class TestKuduClient {
.build();
try {
methodToInvoke.invoke(cl);
+ fail();
} catch (InvocationTargetException ex) {
assertTrue(ex.getTargetException() instanceof KuduException);
KuduException realEx = (KuduException)ex.getTargetException();
@@ -1154,6 +1154,7 @@ public class TestKuduClient {
public void testGetHiveMetastoreConfigDuringLeaderElection() throws Exception {
runTestCallDuringLeaderElection("getHiveMetastoreConfig");
}
+
/**
* Test assignment of a location to the client.
*/
@@ -1173,4 +1174,26 @@ public class TestKuduClient {
client.listTabletServers();
assertEquals("/L0", client.getLocationString());
}
+
+ @Test(timeout = 100000)
+ public void testSessionOnceClosed() throws Exception {
+ client.createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions());
+ KuduTable table = client.openTable(TABLE_NAME);
+ KuduSession session = client.newSession();
+
+ session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+ Insert insert = createBasicSchemaInsert(table, 0);
+ session.apply(insert);
+ session.close();
+ assertTrue(session.isClosed());
+
+ insert = createBasicSchemaInsert(table, 1);
+ CapturingLogAppender cla = new CapturingLogAppender();
+ try (Closeable c = cla.attach()) {
+ session.apply(insert);
+ }
+ String loggedText = cla.getAppendedText();
+ assertTrue("Missing warning:\n" + loggedText,
+ loggedText.contains("this is unsafe"));
+ }
}