You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/10/20 11:50:21 UTC

[GitHub] [spark] grundprinzip opened a new pull request, #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

grundprinzip opened a new pull request, #38320:
URL: https://github.com/apache/spark/pull/38320

   ### What changes were proposed in this pull request?
   To be able to modify the incoming requests for the Spark Connect GRPC service, for example to be able to translate metadata from the HTTP/2 request to values in the proto message the GRPC service needs to be configured using an interceptor.
   
   This patch adds two ways to configure interceptors for the GRPC service. First, we can now configure interceptors in the `SparkConnectInterceptorRegistry` by adding a value to the `interceptorChain` like in the example below:
   
   ```
   object SparkConnectInterceptorRegistry {
   
     // Contains the list of configured interceptors.
     private lazy val interceptorChain: Seq[InterceptorBuilder] = Seq(
       interceptor[LoggingInterceptor](classOf[LoggingInterceptor])
     )
     // ...
   }
   ```
   
   The second way to configure interceptors is by configuring them using Spark configuration values at startup. Therefore a new config key has been added called: `spark.connect.grpc.interceptor.classes`. This config value contains a comma-separated list of classes that are added as interceptors to the system.
   
   ```
   ./bin/pyspark --conf spark.connect.grpc.interceptor.classes=com.my.important.LoggingInterceptor
   ```
   
   During startup all of the interceptors are added in order to the `NettyServerBuilder`.
   
   ```
   // Add all registered interceptors to the server builder.
   SparkConnectInterceptorRegistry.chainInterceptors(sb)
   ```
   
   ### Why are the changes needed?
   Provide a configurable and extensible way to configure interceptors.
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Unit Tests
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1001283755


##########
core/src/main/resources/error/error-classes.json:
##########
@@ -698,6 +698,23 @@
     ],
     "sqlState" : "22023"
   },
+  "SPARK_CONNECT" : {

Review Comment:
   Nit: we might not need `SPARK_` within the Spark repo? `CONNECT` is the project/component name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1003954578


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala:
##########
@@ -26,4 +26,12 @@ private[spark] object Connect {
       .intConf
       .createWithDefault(15002)
 
+  val CONNECT_GRPC_INTERCEPTOR_CLASSES =

Review Comment:
   This is in the connect module.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1001531871


##########
core/src/main/resources/error/error-classes.json:
##########
@@ -698,6 +698,23 @@
     ],
     "sqlState" : "22023"
   },
+  "SPARK_CONNECT" : {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1004090746


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala:
##########
@@ -26,4 +26,12 @@ private[spark] object Connect {
       .intConf
       .createWithDefault(15002)
 
+  val CONNECT_GRPC_INTERCEPTOR_CLASSES =

Review Comment:
   StaticSQLConf and SQLConf are in the SQL module, it's weird to add spark connect configs there...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on PR #38320:
URL: https://github.com/apache/spark/pull/38320#issuecomment-1289184249

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1004088030


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala:
##########
@@ -26,4 +26,12 @@ private[spark] object Connect {
       .intConf
       .createWithDefault(15002)
 
+  val CONNECT_GRPC_INTERCEPTOR_CLASSES =

Review Comment:
   My point is that it's mixed. It's in SQL package but the configuration being used is `SparkConf`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on PR #38320:
URL: https://github.com/apache/spark/pull/38320#issuecomment-1287316245

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1003955956


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.lang.reflect.InvocationTargetException
+
+import io.grpc.ServerInterceptor
+import io.grpc.netty.NettyServerBuilder
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.util.Utils
+
+/**
+ * This object provides a global list of configured interceptors for GRPC. The interceptors are
+ * added to the GRPC server in order of their position in the list. Once the statically compiled
+ * interceptors are added, dynamically configured interceptors are added.
+ */
+object SparkConnectInterceptorRegistry {

Review Comment:
   It is not .. unless we document so ... Should probably either document it like https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala or explicitly make it private/public.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1001279885


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala:
##########
@@ -26,4 +26,9 @@ private[spark] object Connect {
       .intConf
       .createWithDefault(15002)
 
+  val CONNECT_GRPC_INTERCEPTOR_CLASSES =
+    ConfigBuilder("spark.connect.grpc.interceptor.classes")
+      .version("3.4.0")

Review Comment:
   add `.doc()` to document the class name separator is comma.



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -4276,4 +4293,4 @@
       "Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting <autoBroadcastjoinThreshold> to -1 or increase the spark driver memory by setting <driverMemory> to a higher value<analyzeTblMsg>"
     ]
   }
-}
+}

Review Comment:
   This can be reverted?



##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/service/InterceptorRegistryTest.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import io.grpc.{Metadata, ServerCall, ServerCallHandler, ServerInterceptor}
+import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener
+import io.grpc.netty.NettyServerBuilder
+
+import org.apache.spark.{SparkEnv, SparkIllegalArgumentException, SparkRuntimeException}
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Used for testing only.
+ */
+class TestingInterceptorNoTrivialCtor(id: Int) extends ServerInterceptor {
+  override def interceptCall[ReqT, RespT](
+      call: ServerCall[ReqT, RespT],
+      headers: Metadata,
+      next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {
+    val listener = next.startCall(call, headers)
+    new SimpleForwardingServerCallListener[ReqT](listener) {
+      override def onMessage(message: ReqT): Unit = {
+        delegate().onMessage(message)
+      }
+    }
+  }
+}
+
+/**
+ * Used for testing only.
+ */
+class TestingInterceptorInstantiationError extends ServerInterceptor {
+  throw new ArrayIndexOutOfBoundsException("Bad Error")
+
+  override def interceptCall[ReqT, RespT](
+      call: ServerCall[ReqT, RespT],
+      headers: Metadata,
+      next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {
+    val listener = next.startCall(call, headers)
+    new SimpleForwardingServerCallListener[ReqT](listener) {
+      override def onMessage(message: ReqT): Unit = {
+        delegate().onMessage(message)
+      }
+    }
+  }
+}
+
+class InterceptorRegistryTest extends SharedSparkSession {
+
+  override def beforeEach(): Unit = {
+    if (SparkEnv.get.conf.contains(Connect.CONNECT_GRPC_INTERCEPTOR_CLASSES)) {
+      SparkEnv.get.conf.remove(Connect.CONNECT_GRPC_INTERCEPTOR_CLASSES)

Review Comment:
   hmmm this seems to be `SQLConf` only. I am not sure actually for non SQL config though.



##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/service/InterceptorRegistryTest.scala:
##########
@@ -0,0 +1,134 @@
+/*

Review Comment:
   Rename this file to `InterceptorRegistrySuite`?



##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.lang.reflect.InvocationTargetException
+
+import io.grpc.{Metadata, ServerCall, ServerCallHandler, ServerInterceptor}
+import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener
+import io.grpc.netty.NettyServerBuilder
+
+import org.apache.spark.{SparkEnv, SparkIllegalArgumentException, SparkRuntimeException}
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.util.Utils
+
+/**
+ * Used for testing only, does not do anything.
+ */
+class DummyInterceptor extends ServerInterceptor {

Review Comment:
   Looks like this class is loaded by class name? In this case maybe just move this class to test directory?



##########
core/src/main/resources/error/error-classes.json:
##########
@@ -698,6 +698,23 @@
     ],
     "sqlState" : "22023"
   },
+  "SPARK_CONNECT" : {

Review Comment:
   Nit: we might not need `SPARK_` with the Spark repo? `CONNECT` is the project/component name.



##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/service/InterceptorRegistryTest.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import io.grpc.{Metadata, ServerCall, ServerCallHandler, ServerInterceptor}
+import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener
+import io.grpc.netty.NettyServerBuilder
+
+import org.apache.spark.{SparkEnv, SparkIllegalArgumentException, SparkRuntimeException}
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Used for testing only.
+ */
+class TestingInterceptorNoTrivialCtor(id: Int) extends ServerInterceptor {
+  override def interceptCall[ReqT, RespT](
+      call: ServerCall[ReqT, RespT],
+      headers: Metadata,
+      next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {
+    val listener = next.startCall(call, headers)
+    new SimpleForwardingServerCallListener[ReqT](listener) {
+      override def onMessage(message: ReqT): Unit = {
+        delegate().onMessage(message)
+      }
+    }
+  }
+}
+
+/**
+ * Used for testing only.
+ */
+class TestingInterceptorInstantiationError extends ServerInterceptor {
+  throw new ArrayIndexOutOfBoundsException("Bad Error")
+
+  override def interceptCall[ReqT, RespT](
+      call: ServerCall[ReqT, RespT],
+      headers: Metadata,
+      next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {
+    val listener = next.startCall(call, headers)
+    new SimpleForwardingServerCallListener[ReqT](listener) {
+      override def onMessage(message: ReqT): Unit = {
+        delegate().onMessage(message)
+      }
+    }
+  }
+}
+
+class InterceptorRegistryTest extends SharedSparkSession {
+
+  override def beforeEach(): Unit = {
+    if (SparkEnv.get.conf.contains(Connect.CONNECT_GRPC_INTERCEPTOR_CLASSES)) {
+      SparkEnv.get.conf.remove(Connect.CONNECT_GRPC_INTERCEPTOR_CLASSES)

Review Comment:
   You don't need this. There is a way to do per test case config setting. One example is 
   
   ```
     test("SPARK-40389: Don't eliminate a cast which can cause overflow") {
       withSQLConf(SQLConf.ANSI_ENABLED.key -> "true") {
         withTable("dt") {
           sql("create table dt using parquet as select 9000000000BD as d")
           val msg = intercept[SparkException] {
             sql("select cast(cast(d as int) as long) from dt").collect()
           }.getCause.getMessage
           assert(msg.contains("[CAST_OVERFLOW]"))
         }
       }
     }
     ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1003955262


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala:
##########
@@ -26,4 +26,12 @@ private[spark] object Connect {
       .intConf
       .createWithDefault(15002)
 
+  val CONNECT_GRPC_INTERCEPTOR_CLASSES =

Review Comment:
   but it's under `apache.spark.sql.connect`. Should probably move it to `apache.spark.connect`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1004115757


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala:
##########
@@ -26,4 +26,12 @@ private[spark] object Connect {
       .intConf
       .createWithDefault(15002)
 
+  val CONNECT_GRPC_INTERCEPTOR_CLASSES =

Review Comment:
   I'm not sure what's the benefit of doing so. Config definition can be anywhere and we can still use SQLConf to access it. e.g. `SQLConf.get.getConf(CONNECT_GRPC_INTERCEPTOR_CLASSES)`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1001511084


##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/service/InterceptorRegistryTest.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import io.grpc.{Metadata, ServerCall, ServerCallHandler, ServerInterceptor}
+import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener
+import io.grpc.netty.NettyServerBuilder
+
+import org.apache.spark.{SparkEnv, SparkIllegalArgumentException, SparkRuntimeException}
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Used for testing only.
+ */
+class TestingInterceptorNoTrivialCtor(id: Int) extends ServerInterceptor {
+  override def interceptCall[ReqT, RespT](
+      call: ServerCall[ReqT, RespT],
+      headers: Metadata,
+      next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {
+    val listener = next.startCall(call, headers)
+    new SimpleForwardingServerCallListener[ReqT](listener) {
+      override def onMessage(message: ReqT): Unit = {
+        delegate().onMessage(message)
+      }
+    }
+  }
+}
+
+/**
+ * Used for testing only.
+ */
+class TestingInterceptorInstantiationError extends ServerInterceptor {
+  throw new ArrayIndexOutOfBoundsException("Bad Error")
+
+  override def interceptCall[ReqT, RespT](
+      call: ServerCall[ReqT, RespT],
+      headers: Metadata,
+      next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {
+    val listener = next.startCall(call, headers)
+    new SimpleForwardingServerCallListener[ReqT](listener) {
+      override def onMessage(message: ReqT): Unit = {
+        delegate().onMessage(message)
+      }
+    }
+  }
+}
+
+class InterceptorRegistryTest extends SharedSparkSession {
+
+  override def beforeEach(): Unit = {
+    if (SparkEnv.get.conf.contains(Connect.CONNECT_GRPC_INTERCEPTOR_CLASSES)) {
+      SparkEnv.get.conf.remove(Connect.CONNECT_GRPC_INTERCEPTOR_CLASSES)

Review Comment:
   I can create a helper for this since there does not seem to be a more generic SparkConf helper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1003958927


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala:
##########
@@ -26,4 +26,12 @@ private[spark] object Connect {
       .intConf
       .createWithDefault(15002)
 
+  val CONNECT_GRPC_INTERCEPTOR_CLASSES =

Review Comment:
   right now, everything in Spark connect is under `apache.spark.sql.connect`. Are you proposing a overall package renaming?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1003988774


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala:
##########
@@ -26,4 +26,12 @@ private[spark] object Connect {
       .intConf
       .createWithDefault(15002)
 
+  val CONNECT_GRPC_INTERCEPTOR_CLASSES =

Review Comment:
   Yeah, I am proposing before it's too late.
   
   Either: if we target to cover other components too, should probably rename them before it's too late. For PySpark too, should probably move it from `pyspark.sql.connect.DataFrame` -> `pyspark.connect.sql.DataFrame`.
   
   Or, use `StaticSQLConf` since we're in SQL package. We're doing this in Hive thirft server, Hive modules, etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1001507879


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.lang.reflect.InvocationTargetException
+
+import io.grpc.{Metadata, ServerCall, ServerCallHandler, ServerInterceptor}
+import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener
+import io.grpc.netty.NettyServerBuilder
+
+import org.apache.spark.{SparkEnv, SparkIllegalArgumentException, SparkRuntimeException}
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.util.Utils
+
+/**
+ * Used for testing only, does not do anything.
+ */
+class DummyInterceptor extends ServerInterceptor {

Review Comment:
   will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1004978147


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.lang.reflect.InvocationTargetException
+
+import io.grpc.ServerInterceptor
+import io.grpc.netty.NettyServerBuilder
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.util.Utils
+
+/**
+ * This object provides a global list of configured interceptors for GRPC. The interceptors are
+ * added to the GRPC server in order of their position in the list. Once the statically compiled
+ * interceptors are added, dynamically configured interceptors are added.
+ */
+object SparkConnectInterceptorRegistry {

Review Comment:
   makes sense. Let me check places that is intentional to be private or internal API but are documented. I can follow up on it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1003942285


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala:
##########
@@ -26,4 +26,12 @@ private[spark] object Connect {
       .intConf
       .createWithDefault(15002)
 
+  val CONNECT_GRPC_INTERCEPTOR_CLASSES =

Review Comment:
   Just realized that this is under `apach.spark.sql.` ... we should either move this module out of `sql` or use `StaticSQLConf` ideally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1002900934


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.lang.reflect.InvocationTargetException
+
+import io.grpc.ServerInterceptor
+import io.grpc.netty.NettyServerBuilder
+
+import org.apache.spark.{SparkEnv, SparkIllegalArgumentException, SparkRuntimeException}
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.util.Utils
+
+/**
+ * This object provides a global list of configured interceptors for GRPC. The interceptors are
+ * added to the GRPC server in order of their position in the list. Once the statically compiled
+ * interceptors are added, dynamically configured interceptors are added.
+ */
+object SparkConnectInterceptorRegistry {
+
+  // Contains the list of configured interceptors.
+  private lazy val interceptorChain: Seq[InterceptorBuilder] = Seq(
+    // Adding a new interceptor at compile time works like the eaxmple below with the dummy
+    // interceptor:
+    // interceptor[DummyInterceptor](classOf[DummyInterceptor])
+  )
+
+  /**
+   * Given a NettyServerBuilder instance, will chain all interceptors to it in reverse order.
+   * @param sb
+   */
+  def chainInterceptors(sb: NettyServerBuilder): Unit = {
+    interceptorChain.foreach(i => sb.intercept(i()))
+    createConfiguredInterceptors().foreach(sb.intercept(_))
+  }
+
+  // Type used to identify the closure responsible to instantiate a ServerInterceptor.
+  type InterceptorBuilder = () => ServerInterceptor
+
+  /**
+   * For testing only.
+   * @return
+   */
+  def createConfiguredInterceptors(): Seq[ServerInterceptor] = {
+    // Check all values from the Spark conf.
+    val classes = SparkEnv.get.conf.get(Connect.CONNECT_GRPC_INTERCEPTOR_CLASSES)
+    if (classes.nonEmpty) {
+      classes.get
+        .split(",")
+        .map(_.trim)
+        .filter(_.nonEmpty)
+        .map(Utils.classForName[ServerInterceptor](_))
+        .map(createInstance(_))
+    } else {
+      Seq.empty
+    }
+  }
+
+  /**
+   * Creates a new instance of T using the default constructor.
+   * @param cls
+   * @tparam T
+   * @return
+   */
+  private def createInstance[T <: ServerInterceptor](cls: Class[T]): ServerInterceptor = {
+    val ctorOpt = cls.getConstructors.find(_.getParameterCount == 0)
+    if (ctorOpt.isEmpty) {
+      throw new SparkIllegalArgumentException(

Review Comment:
   With error class, we don't need different exception classes anymore. We can just new `SparkException` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1004001685


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala:
##########
@@ -26,4 +26,12 @@ private[spark] object Connect {
       .intConf
       .createWithDefault(15002)
 
+  val CONNECT_GRPC_INTERCEPTOR_CLASSES =

Review Comment:
   Spark connect is really a SQL thing (DF API, no RDD support). The current package name looks fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1002900997


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.lang.reflect.InvocationTargetException
+
+import io.grpc.ServerInterceptor
+import io.grpc.netty.NettyServerBuilder
+
+import org.apache.spark.{SparkEnv, SparkIllegalArgumentException, SparkRuntimeException}
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.util.Utils
+
+/**
+ * This object provides a global list of configured interceptors for GRPC. The interceptors are
+ * added to the GRPC server in order of their position in the list. Once the statically compiled
+ * interceptors are added, dynamically configured interceptors are added.
+ */
+object SparkConnectInterceptorRegistry {
+
+  // Contains the list of configured interceptors.
+  private lazy val interceptorChain: Seq[InterceptorBuilder] = Seq(
+    // Adding a new interceptor at compile time works like the eaxmple below with the dummy
+    // interceptor:
+    // interceptor[DummyInterceptor](classOf[DummyInterceptor])
+  )
+
+  /**
+   * Given a NettyServerBuilder instance, will chain all interceptors to it in reverse order.
+   * @param sb
+   */
+  def chainInterceptors(sb: NettyServerBuilder): Unit = {
+    interceptorChain.foreach(i => sb.intercept(i()))
+    createConfiguredInterceptors().foreach(sb.intercept(_))
+  }
+
+  // Type used to identify the closure responsible to instantiate a ServerInterceptor.
+  type InterceptorBuilder = () => ServerInterceptor
+
+  /**
+   * For testing only.
+   * @return
+   */
+  def createConfiguredInterceptors(): Seq[ServerInterceptor] = {
+    // Check all values from the Spark conf.
+    val classes = SparkEnv.get.conf.get(Connect.CONNECT_GRPC_INTERCEPTOR_CLASSES)
+    if (classes.nonEmpty) {
+      classes.get
+        .split(",")
+        .map(_.trim)
+        .filter(_.nonEmpty)
+        .map(Utils.classForName[ServerInterceptor](_))
+        .map(createInstance(_))
+    } else {
+      Seq.empty
+    }
+  }
+
+  /**
+   * Creates a new instance of T using the default constructor.
+   * @param cls
+   * @tparam T
+   * @return
+   */
+  private def createInstance[T <: ServerInterceptor](cls: Class[T]): ServerInterceptor = {
+    val ctorOpt = cls.getConstructors.find(_.getParameterCount == 0)
+    if (ctorOpt.isEmpty) {
+      throw new SparkIllegalArgumentException(
+        errorClass = "CONNECT.INTERCEPTOR_CTOR_MISSING",
+        messageParameters = Map("cls" -> cls.getName))
+    }
+    try {
+      ctorOpt.get.newInstance().asInstanceOf[T]
+    } catch {
+      case e: InvocationTargetException =>
+        throw new SparkRuntimeException(

Review Comment:
   ditto



##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.lang.reflect.InvocationTargetException
+
+import io.grpc.ServerInterceptor
+import io.grpc.netty.NettyServerBuilder
+
+import org.apache.spark.{SparkEnv, SparkIllegalArgumentException, SparkRuntimeException}
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.util.Utils
+
+/**
+ * This object provides a global list of configured interceptors for GRPC. The interceptors are
+ * added to the GRPC server in order of their position in the list. Once the statically compiled
+ * interceptors are added, dynamically configured interceptors are added.
+ */
+object SparkConnectInterceptorRegistry {
+
+  // Contains the list of configured interceptors.
+  private lazy val interceptorChain: Seq[InterceptorBuilder] = Seq(
+    // Adding a new interceptor at compile time works like the eaxmple below with the dummy
+    // interceptor:
+    // interceptor[DummyInterceptor](classOf[DummyInterceptor])
+  )
+
+  /**
+   * Given a NettyServerBuilder instance, will chain all interceptors to it in reverse order.
+   * @param sb
+   */
+  def chainInterceptors(sb: NettyServerBuilder): Unit = {
+    interceptorChain.foreach(i => sb.intercept(i()))
+    createConfiguredInterceptors().foreach(sb.intercept(_))
+  }
+
+  // Type used to identify the closure responsible to instantiate a ServerInterceptor.
+  type InterceptorBuilder = () => ServerInterceptor
+
+  /**
+   * For testing only.
+   * @return
+   */
+  def createConfiguredInterceptors(): Seq[ServerInterceptor] = {
+    // Check all values from the Spark conf.
+    val classes = SparkEnv.get.conf.get(Connect.CONNECT_GRPC_INTERCEPTOR_CLASSES)
+    if (classes.nonEmpty) {
+      classes.get
+        .split(",")
+        .map(_.trim)
+        .filter(_.nonEmpty)
+        .map(Utils.classForName[ServerInterceptor](_))
+        .map(createInstance(_))
+    } else {
+      Seq.empty
+    }
+  }
+
+  /**
+   * Creates a new instance of T using the default constructor.
+   * @param cls
+   * @tparam T
+   * @return
+   */
+  private def createInstance[T <: ServerInterceptor](cls: Class[T]): ServerInterceptor = {
+    val ctorOpt = cls.getConstructors.find(_.getParameterCount == 0)
+    if (ctorOpt.isEmpty) {
+      throw new SparkIllegalArgumentException(
+        errorClass = "CONNECT.INTERCEPTOR_CTOR_MISSING",
+        messageParameters = Map("cls" -> cls.getName))
+    }
+    try {
+      ctorOpt.get.newInstance().asInstanceOf[T]
+    } catch {
+      case e: InvocationTargetException =>
+        throw new SparkRuntimeException(
+          errorClass = "CONNECT.INTERCEPTOR_RUNTIME_ERROR",
+          messageParameters = Map("msg" -> e.getTargetException.getMessage),
+          cause = e)
+      case e: Exception =>
+        throw new SparkRuntimeException(

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1003959231


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.lang.reflect.InvocationTargetException
+
+import io.grpc.ServerInterceptor
+import io.grpc.netty.NettyServerBuilder
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.util.Utils
+
+/**
+ * This object provides a global list of configured interceptors for GRPC. The interceptors are
+ * added to the GRPC server in order of their position in the list. Once the statically compiled
+ * interceptors are added, dynamically configured interceptors are added.
+ */
+object SparkConnectInterceptorRegistry {

Review Comment:
   let's document it, just like what catalyst does. cc @amaliujia 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1003991428


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala:
##########
@@ -26,4 +26,12 @@ private[spark] object Connect {
       .intConf
       .createWithDefault(15002)
 
+  val CONNECT_GRPC_INTERCEPTOR_CLASSES =

Review Comment:
   The reason for being under sql is that we need to access some package protected functionality from catalyst. In addition, the intention was that it's clear that it's built on top of the sql functionality.
   
   For the clients it's a different thing. The clients should Ideally have the exact same package names as our client APIs have today.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1004094597


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala:
##########
@@ -26,4 +26,12 @@ private[spark] object Connect {
       .intConf
       .createWithDefault(15002)
 
+  val CONNECT_GRPC_INTERCEPTOR_CLASSES =

Review Comment:
   That's what Hive thriftserver (separate module) dose. Avro (separate module) and Kafka (separate module for Structured Streaming) do. Pandas API on Spark also leverages runtime configurations via SparkSession under the hood instead of SparkConf.
   
   This is weird that it's a SQL thing uses SQL package namespace but it doesn't use `SQLConf`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1004087242


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala:
##########
@@ -26,4 +26,12 @@ private[spark] object Connect {
       .intConf
       .createWithDefault(15002)
 
+  val CONNECT_GRPC_INTERCEPTOR_CLASSES =

Review Comment:
   Then we should better use `StaticSQLConf` or `SQLConf` instead of `SparkConf`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
cloud-fan closed pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.
URL: https://github.com/apache/spark/pull/38320


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1003941174


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.lang.reflect.InvocationTargetException
+
+import io.grpc.ServerInterceptor
+import io.grpc.netty.NettyServerBuilder
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.util.Utils
+
+/**
+ * This object provides a global list of configured interceptors for GRPC. The interceptors are
+ * added to the GRPC server in order of their position in the list. Once the statically compiled
+ * interceptors are added, dynamically configured interceptors are added.
+ */
+object SparkConnectInterceptorRegistry {

Review Comment:
   Or it has to be `private[service]`. Or we should at least mention what are supposed to be an API at `src/main/scala/org/apache/spark/sql/connect/package.scala`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1003940378


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.lang.reflect.InvocationTargetException
+
+import io.grpc.ServerInterceptor
+import io.grpc.netty.NettyServerBuilder
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.util.Utils
+
+/**
+ * This object provides a global list of configured interceptors for GRPC. The interceptors are
+ * added to the GRPC server in order of their position in the list. Once the statically compiled
+ * interceptors are added, dynamically configured interceptors are added.
+ */
+object SparkConnectInterceptorRegistry {

Review Comment:
   Is this an API? We should mark `@Unstable` and added version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1002901217


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.lang.reflect.InvocationTargetException
+
+import io.grpc.ServerInterceptor
+import io.grpc.netty.NettyServerBuilder
+
+import org.apache.spark.{SparkEnv, SparkIllegalArgumentException, SparkRuntimeException}
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.util.Utils
+
+/**
+ * This object provides a global list of configured interceptors for GRPC. The interceptors are
+ * added to the GRPC server in order of their position in the list. Once the statically compiled
+ * interceptors are added, dynamically configured interceptors are added.
+ */
+object SparkConnectInterceptorRegistry {
+
+  // Contains the list of configured interceptors.
+  private lazy val interceptorChain: Seq[InterceptorBuilder] = Seq(
+    // Adding a new interceptor at compile time works like the eaxmple below with the dummy
+    // interceptor:
+    // interceptor[DummyInterceptor](classOf[DummyInterceptor])
+  )
+
+  /**
+   * Given a NettyServerBuilder instance, will chain all interceptors to it in reverse order.
+   * @param sb
+   */
+  def chainInterceptors(sb: NettyServerBuilder): Unit = {
+    interceptorChain.foreach(i => sb.intercept(i()))
+    createConfiguredInterceptors().foreach(sb.intercept(_))
+  }
+
+  // Type used to identify the closure responsible to instantiate a ServerInterceptor.
+  type InterceptorBuilder = () => ServerInterceptor
+
+  /**
+   * For testing only.
+   * @return
+   */
+  def createConfiguredInterceptors(): Seq[ServerInterceptor] = {
+    // Check all values from the Spark conf.
+    val classes = SparkEnv.get.conf.get(Connect.CONNECT_GRPC_INTERCEPTOR_CLASSES)
+    if (classes.nonEmpty) {
+      classes.get
+        .split(",")
+        .map(_.trim)
+        .filter(_.nonEmpty)
+        .map(Utils.classForName[ServerInterceptor](_))
+        .map(createInstance(_))
+    } else {
+      Seq.empty
+    }
+  }
+
+  /**
+   * Creates a new instance of T using the default constructor.
+   * @param cls
+   * @tparam T
+   * @return
+   */
+  private def createInstance[T <: ServerInterceptor](cls: Class[T]): ServerInterceptor = {
+    val ctorOpt = cls.getConstructors.find(_.getParameterCount == 0)
+    if (ctorOpt.isEmpty) {
+      throw new SparkIllegalArgumentException(
+        errorClass = "CONNECT.INTERCEPTOR_CTOR_MISSING",
+        messageParameters = Map("cls" -> cls.getName))
+    }
+    try {
+      ctorOpt.get.newInstance().asInstanceOf[T]
+    } catch {
+      case e: InvocationTargetException =>
+        throw new SparkRuntimeException(
+          errorClass = "CONNECT.INTERCEPTOR_RUNTIME_ERROR",
+          messageParameters = Map("msg" -> e.getTargetException.getMessage),
+          cause = e)
+      case e: Exception =>
+        throw new SparkRuntimeException(
+          errorClass = "CONNECT.INTERCEPTOR_RUNTIME_ERROR",
+          messageParameters = Map("msg" -> "Unknown error"),

Review Comment:
   can't we use `e.getMessage`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on PR #38320:
URL: https://github.com/apache/spark/pull/38320#issuecomment-1285392308

   R: @cloud-fan @amaliujia @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1001504294


##########
core/src/main/resources/error/error-classes.json:
##########
@@ -4276,4 +4293,4 @@
       "Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting <autoBroadcastjoinThreshold> to -1 or increase the spark driver memory by setting <driverMemory> to a higher value<analyzeTblMsg>"
     ]
   }
-}
+}

Review Comment:
   I can check if the tests pass if this line is removed. I had some challenges with failing tests because they check the style of the his json file :(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #38320:
URL: https://github.com/apache/spark/pull/38320#issuecomment-1285832339

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1001503556


##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/service/InterceptorRegistryTest.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import io.grpc.{Metadata, ServerCall, ServerCallHandler, ServerInterceptor}
+import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener
+import io.grpc.netty.NettyServerBuilder
+
+import org.apache.spark.{SparkEnv, SparkIllegalArgumentException, SparkRuntimeException}
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Used for testing only.
+ */
+class TestingInterceptorNoTrivialCtor(id: Int) extends ServerInterceptor {
+  override def interceptCall[ReqT, RespT](
+      call: ServerCall[ReqT, RespT],
+      headers: Metadata,
+      next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {
+    val listener = next.startCall(call, headers)
+    new SimpleForwardingServerCallListener[ReqT](listener) {
+      override def onMessage(message: ReqT): Unit = {
+        delegate().onMessage(message)
+      }
+    }
+  }
+}
+
+/**
+ * Used for testing only.
+ */
+class TestingInterceptorInstantiationError extends ServerInterceptor {
+  throw new ArrayIndexOutOfBoundsException("Bad Error")
+
+  override def interceptCall[ReqT, RespT](
+      call: ServerCall[ReqT, RespT],
+      headers: Metadata,
+      next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {
+    val listener = next.startCall(call, headers)
+    new SimpleForwardingServerCallListener[ReqT](listener) {
+      override def onMessage(message: ReqT): Unit = {
+        delegate().onMessage(message)
+      }
+    }
+  }
+}
+
+class InterceptorRegistryTest extends SharedSparkSession {
+
+  override def beforeEach(): Unit = {
+    if (SparkEnv.get.conf.contains(Connect.CONNECT_GRPC_INTERCEPTOR_CLASSES)) {
+      SparkEnv.get.conf.remove(Connect.CONNECT_GRPC_INTERCEPTOR_CLASSES)

Review Comment:
   I can have a look at the closure and see what it does.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1001530483


##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/service/InterceptorRegistryTest.scala:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import io.grpc.{Metadata, ServerCall, ServerCallHandler, ServerInterceptor}
+import io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener
+import io.grpc.netty.NettyServerBuilder
+
+import org.apache.spark.{SparkEnv, SparkIllegalArgumentException, SparkRuntimeException}
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Used for testing only.
+ */
+class TestingInterceptorNoTrivialCtor(id: Int) extends ServerInterceptor {
+  override def interceptCall[ReqT, RespT](
+      call: ServerCall[ReqT, RespT],
+      headers: Metadata,
+      next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {
+    val listener = next.startCall(call, headers)
+    new SimpleForwardingServerCallListener[ReqT](listener) {
+      override def onMessage(message: ReqT): Unit = {
+        delegate().onMessage(message)
+      }
+    }
+  }
+}
+
+/**
+ * Used for testing only.
+ */
+class TestingInterceptorInstantiationError extends ServerInterceptor {
+  throw new ArrayIndexOutOfBoundsException("Bad Error")
+
+  override def interceptCall[ReqT, RespT](
+      call: ServerCall[ReqT, RespT],
+      headers: Metadata,
+      next: ServerCallHandler[ReqT, RespT]): ServerCall.Listener[ReqT] = {
+    val listener = next.startCall(call, headers)
+    new SimpleForwardingServerCallListener[ReqT](listener) {
+      override def onMessage(message: ReqT): Unit = {
+        delegate().onMessage(message)
+      }
+    }
+  }
+}
+
+class InterceptorRegistryTest extends SharedSparkSession {
+
+  override def beforeEach(): Unit = {
+    if (SparkEnv.get.conf.contains(Connect.CONNECT_GRPC_INTERCEPTOR_CLASSES)) {
+      SparkEnv.get.conf.remove(Connect.CONNECT_GRPC_INTERCEPTOR_CLASSES)

Review Comment:
   I added a small helper, PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1002900490


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.lang.reflect.InvocationTargetException
+
+import io.grpc.ServerInterceptor
+import io.grpc.netty.NettyServerBuilder
+
+import org.apache.spark.{SparkEnv, SparkIllegalArgumentException, SparkRuntimeException}
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.util.Utils
+
+/**
+ * This object provides a global list of configured interceptors for GRPC. The interceptors are
+ * added to the GRPC server in order of their position in the list. Once the statically compiled
+ * interceptors are added, dynamically configured interceptors are added.
+ */
+object SparkConnectInterceptorRegistry {
+
+  // Contains the list of configured interceptors.
+  private lazy val interceptorChain: Seq[InterceptorBuilder] = Seq(
+    // Adding a new interceptor at compile time works like the eaxmple below with the dummy
+    // interceptor:
+    // interceptor[DummyInterceptor](classOf[DummyInterceptor])
+  )
+
+  /**
+   * Given a NettyServerBuilder instance, will chain all interceptors to it in reverse order.
+   * @param sb
+   */
+  def chainInterceptors(sb: NettyServerBuilder): Unit = {
+    interceptorChain.foreach(i => sb.intercept(i()))
+    createConfiguredInterceptors().foreach(sb.intercept(_))
+  }
+
+  // Type used to identify the closure responsible to instantiate a ServerInterceptor.
+  type InterceptorBuilder = () => ServerInterceptor
+
+  /**
+   * For testing only.
+   * @return

Review Comment:
   ```suggestion
      * Exposed for testing only.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1003941174


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.lang.reflect.InvocationTargetException
+
+import io.grpc.ServerInterceptor
+import io.grpc.netty.NettyServerBuilder
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.util.Utils
+
+/**
+ * This object provides a global list of configured interceptors for GRPC. The interceptors are
+ * added to the GRPC server in order of their position in the list. Once the statically compiled
+ * interceptors are added, dynamically configured interceptors are added.
+ */
+object SparkConnectInterceptorRegistry {

Review Comment:
   Or it has to be `private`. Or we should at least mention what are supposed to be an API at `src/main/scala/org/apache/spark/sql/connect/package.scala`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1003954707


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectInterceptorRegistry.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.service
+
+import java.lang.reflect.InvocationTargetException
+
+import io.grpc.ServerInterceptor
+import io.grpc.netty.NettyServerBuilder
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.sql.connect.config.Connect
+import org.apache.spark.util.Utils
+
+/**
+ * This object provides a global list of configured interceptors for GRPC. The interceptors are
+ * added to the GRPC server in order of their position in the list. Once the statically compiled
+ * interceptors are added, dynamically configured interceptors are added.
+ */
+object SparkConnectInterceptorRegistry {

Review Comment:
   I thought everything under `org.apache.spark.sql.connect.service` is private, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38320: [SPARK-40857] [CONNECT] Enable configurable GPRC Interceptors.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38320:
URL: https://github.com/apache/spark/pull/38320#discussion_r1004088030


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala:
##########
@@ -26,4 +26,12 @@ private[spark] object Connect {
       .intConf
       .createWithDefault(15002)
 
+  val CONNECT_GRPC_INTERCEPTOR_CLASSES =

Review Comment:
   My point is that it's mixed. It's in SQL package but the configuration being used is `SparkConf`. The configuration name doesn't follow it either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org