You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2023/03/10 23:52:16 UTC

[spark] branch branch-3.4 updated: [SPARK-42721][CONNECT] RPC logging interceptor

This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 516a202e3ed [SPARK-42721][CONNECT] RPC logging interceptor
516a202e3ed is described below

commit 516a202e3ed778487f9d2eaaeb6603193eb0a7b6
Author: Raghu Angadi <ra...@databricks.com>
AuthorDate: Fri Mar 10 19:51:55 2023 -0400

    [SPARK-42721][CONNECT] RPC logging interceptor
    
    ### What changes were proposed in this pull request?
    
    This adds an gRPC interceptor in spark-connect server. It logs all the incoming RPC requests and responses.
    
     - How to enable: Set interceptor config. e.g.
    
           ./sbin/start-connect-server.sh --conf spark.connect.grpc.interceptor.classes=org.apache.spark.sql.connect.service.LoggingInterceptor  --jars connector/connect/server/target/spark-connect_*-SNAPSHOT.jar
    
     - Sample output:
    
            23/03/08 10:54:37 INFO LoggingInterceptor: Received RPC Request spark.connect.SparkConnectService/ExecutePlan (id 1868663481):
            {
              "client_id": "6844bc44-4411-4481-8109-a10e3a836f97",
              "user_context": {
                "user_id": "raghu"
              },
              "plan": {
                "root": {
                  "common": {
                    "plan_id": "37"
                  },
                  "show_string": {
                    "input": {
                      "common": {
                        "plan_id": "36"
                      },
                      "read": {
                        "data_source": {
                          "format": "csv",
                          "schema": "",
                          "paths": ["file:///tmp/x-in"]
                        }
                      }
                    },
                    "num_rows": 20,
                    "truncate": 20
                  }
                }
              },
              "client_type": "_SPARK_CONNECT_PYTHON"
            }
    
    ### Why are the changes needed?
    This is useful in  development. It might be useful to debug some problems in production as well.
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
     - Manually in development
     - Unit test
    
    Closes #40342 from rangadi/logging-interceptor.
    
    Authored-by: Raghu Angadi <ra...@databricks.com>
    Signed-off-by: Herman van Hovell <he...@databricks.com>
    (cherry picked from commit 19cb8d7014e03d828794a637bc67d09fc84650ad)
    Signed-off-by: Herman van Hovell <he...@databricks.com>
---
 connector/connect/server/pom.xml                   |  6 ++
 .../sql/connect/service/LoggingInterceptor.scala   | 75 ++++++++++++++++++++++
 .../connect/service/InterceptorRegistrySuite.scala |  9 +++
 3 files changed, 90 insertions(+)

diff --git a/connector/connect/server/pom.xml b/connector/connect/server/pom.xml
index a2aff8f9f31..302f6590fd2 100644
--- a/connector/connect/server/pom.xml
+++ b/connector/connect/server/pom.xml
@@ -155,6 +155,12 @@
       <version>${protobuf.version}</version>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java-util</artifactId>
+      <version>${protobuf.version}</version>
+      <scope>compile</scope>
+    </dependency>
     <dependency>
       <groupId>io.grpc</groupId>
       <artifactId>grpc-netty</artifactId>
diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala
new file mode 100644
index 00000000000..c91075fd127
--- /dev/null
+++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/LoggingInterceptor.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import scala.util.Random
+
+import com.google.protobuf.Message
+import com.google.protobuf.util.JsonFormat
+import io.grpc.ForwardingServerCall.SimpleForwardingServerCall
+import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener
+import io.grpc.Metadata
+import io.grpc.ServerCall
+import io.grpc.ServerCallHandler
+import io.grpc.ServerInterceptor
+
+import org.apache.spark.internal.Logging
+
+/**
+ * A gRPC interceptor to log RPC requests and responses. It logs the protobufs as JSON.
+ * Useful for local development. An ID is logged for each RPC so that requests and corresponding
+ * responses can be exactly matched.
+ */
+class LoggingInterceptor extends ServerInterceptor with Logging {
+
+  private val jsonPrinter = JsonFormat.printer().preservingProtoFieldNames()
+
+  private def logProto[T](description: String, message: T): Unit = {
+    message match {
+      case m: Message =>
+        logInfo(s"$description:\n${jsonPrinter.print(m)}")
+      case other =>
+        logInfo(s"$description: (Unknown message type) $other")
+    }
+  }
+
+  override def interceptCall[ReqT, RespT](
+      call: ServerCall[ReqT, RespT],
+      headers: Metadata,
+      next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {
+
+    val id = Random.nextInt(Int.MaxValue) // Assign a random id for this RPC.
+    val desc = s"${call.getMethodDescriptor.getFullMethodName} (id $id)"
+
+    val respLoggingCall = new SimpleForwardingServerCall[ReqT, RespT](call) {
+      override def sendMessage(message: RespT): Unit = {
+        logProto(s"Responding to RPC $desc", message)
+        super.sendMessage(message)
+      }
+    }
+
+    val listener = next.startCall(respLoggingCall, headers)
+
+    new SimpleForwardingServerCallListener[ReqT](listener) {
+      override def onMessage(message: ReqT): Unit = {
+        logProto(s"Received RPC request $desc", message)
+        super.onMessage(message)
+      }
+    }
+  }
+}
diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/InterceptorRegistrySuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/InterceptorRegistrySuite.scala
index fb1b3bb9df1..7f85966f0a7 100644
--- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/InterceptorRegistrySuite.scala
+++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/InterceptorRegistrySuite.scala
@@ -175,4 +175,13 @@ class InterceptorRegistrySuite extends SharedSparkSession {
     }
   }
 
+  test("LoggingInterceptor initializes when configured in spark conf") {
+    withSparkConf(
+      Connect.CONNECT_GRPC_INTERCEPTOR_CLASSES.key ->
+        "org.apache.spark.sql.connect.service.LoggingInterceptor") {
+      val interceptors = SparkConnectInterceptorRegistry.createConfiguredInterceptors()
+      assert(interceptors.size == 1)
+      assert(interceptors.head.isInstanceOf[LoggingInterceptor])
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org