You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/03/10 23:52:08 UTC
[spark] branch master updated: [SPARK-42721][CONNECT] RPC logging interceptor
This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 19cb8d7014e [SPARK-42721][CONNECT] RPC logging interceptor
19cb8d7014e is described below
commit 19cb8d7014e03d828794a637bc67d09fc84650ad
Author: Raghu Angadi <ra...@databricks.com>
AuthorDate: Fri Mar 10 19:51:55 2023 -0400
[SPARK-42721][CONNECT] RPC logging interceptor
### What changes were proposed in this pull request?
This adds an gRPC interceptor in spark-connect server. It logs all the incoming RPC requests and responses.
- How to enable: Set interceptor config. e.g.
./sbin/start-connect-server.sh --conf spark.connect.grpc.interceptor.classes=org.apache.spark.sql.connect.service.LoggingInterceptor --jars connector/connect/server/target/spark-connect_*-SNAPSHOT.jar
- Sample output:
23/03/08 10:54:37 INFO LoggingInterceptor: Received RPC Request spark.connect.SparkConnectService/ExecutePlan (id 1868663481):
{
"client_id": "6844bc44-4411-4481-8109-a10e3a836f97",
"user_context": {
"user_id": "raghu"
},
"plan": {
"root": {
"common": {
"plan_id": "37"
},
"show_string": {
"input": {
"common": {
"plan_id": "36"
},
"read": {
"data_source": {
"format": "csv",
"schema": "",
"paths": ["file:///tmp/x-in"]
}
}
},
"num_rows": 20,
"truncate": 20
}
}
},
"client_type": "_SPARK_CONNECT_PYTHON"
}
### Why are the changes needed?
This is useful in development. It might be useful to debug some problems in production as well.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
- Manually in development
- Unit test
Closes #40342 from rangadi/logging-interceptor.
Authored-by: Raghu Angadi <ra...@databricks.com>
Signed-off-by: Herman van Hovell <he...@databricks.com>
---
connector/connect/server/pom.xml | 6 ++
.../sql/connect/service/LoggingInterceptor.scala | 75 ++++++++++++++++++++++
.../connect/service/InterceptorRegistrySuite.scala | 9 +++
3 files changed, 90 insertions(+)
diff --git a/connector/connect/server/pom.xml b/connector/connect/server/pom.xml
index 35b6ae60a16..079d07db362 100644
--- a/connector/connect/server/pom.xml
+++ b/connector/connect/server/pom.xml
@@ -155,6 +155,12 @@
<version>${protobuf.version}</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java-util</artifactId>
+ <version>${protobuf.version}</version>
+ <scope>compile</scope>
+ </dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala
new file mode 100644
index 00000000000..c91075fd127
--- /dev/null
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.util.Random
+
+import com.google.protobuf.Message
+import com.google.protobuf.util.JsonFormat
+import io.grpc.ForwardingServerCall.SimpleForwardingServerCall
+import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener
+import io.grpc.Metadata
+import io.grpc.ServerCall
+import io.grpc.ServerCallHandler
+import io.grpc.ServerInterceptor
+
+import org.apache.spark.internal.Logging
+
+/**
+ * A gRPC interceptor to log RPC requests and responses. It logs the protobufs as JSON.
+ * Useful for local development. An ID is logged for each RPC so that requests and corresponding
+ * responses can be exactly matched.
+ */
+class LoggingInterceptor extends ServerInterceptor with Logging {
+
+ private val jsonPrinter = JsonFormat.printer().preservingProtoFieldNames()
+
+ private def logProto[T](description: String, message: T): Unit = {
+ message match {
+ case m: Message =>
+ logInfo(s"$description:\n${jsonPrinter.print(m)}")
+ case other =>
+ logInfo(s"$description: (Unknown message type) $other")
+ }
+ }
+
+ override def interceptCall[ReqT, RespT](
+ call: ServerCall[ReqT, RespT],
+ headers: Metadata,
+ next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {
+
+ val id = Random.nextInt(Int.MaxValue) // Assign a random id for this RPC.
+ val desc = s"${call.getMethodDescriptor.getFullMethodName} (id $id)"
+
+ val respLoggingCall = new SimpleForwardingServerCall[ReqT, RespT](call) {
+ override def sendMessage(message: RespT): Unit = {
+ logProto(s"Responding to RPC $desc", message)
+ super.sendMessage(message)
+ }
+ }
+
+ val listener = next.startCall(respLoggingCall, headers)
+
+ new SimpleForwardingServerCallListener[ReqT](listener) {
+ override def onMessage(message: ReqT): Unit = {
+ logProto(s"Received RPC request $desc", message)
+ super.onMessage(message)
+ }
+ }
+ }
+}
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/InterceptorRegistrySuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/InterceptorRegistrySuite.scala
index fb1b3bb9df1..7f85966f0a7 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/InterceptorRegistrySuite.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/InterceptorRegistrySuite.scala
@@ -175,4 +175,13 @@ class InterceptorRegistrySuite extends SharedSparkSession {
}
}
+ test("LoggingInterceptor initializes when configured in spark conf") {
+ withSparkConf(
+ Connect.CONNECT_GRPC_INTERCEPTOR_CLASSES.key ->
+ "org.apache.spark.sql.connect.service.LoggingInterceptor") {
+ val interceptors = SparkConnectInterceptorRegistry.createConfiguredInterceptors()
+ assert(interceptors.size == 1)
+ assert(interceptors.head.isInstanceOf[LoggingInterceptor])
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org