You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "zhengruifeng (via GitHub)" <gi...@apache.org> on 2023/04/12 02:35:50 UTC

[GitHub] [spark] zhengruifeng opened a new pull request, #40750: [WIP][CONNECT] Redact the proto message

zhengruifeng opened a new pull request, #40750:
URL: https://github.com/apache/spark/pull/40750

   ### What changes were proposed in this pull request?
   to redact the proto message
   
   
   ### Why are the changes needed?
   1, for security:
   `spark.createDataFrame(range(0, 10)).show()`
   before:
   ![image](https://user-images.githubusercontent.com/7322292/231332864-0f73441b-feab-4672-a4bb-251f2c1aa03e.png)
   
   after:
   ![image](https://user-images.githubusercontent.com/7322292/231332893-ee888de4-10a8-4a31-908b-6336c8e3c763.png)
   
   2, `Message.toString` may cause OOM when the message is large
   This PR try to redact the bytes which is the main part of message in `LocalRelation` and `PythonUDF`
   
   ### Does this PR introduce _any_ user-facing change?
   yes, bytes will not be shown in UI, instead only the size is shown
   
   
   ### How was this patch tested?
   manually check


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40750: [SPARK-43105][CONNECT] Abbreviate Bytes and Strings in proto message

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1164939927


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+  private val MAX_BYTES_SIZE = 8
+  private val MAX_STRING_SIZE = 1024
+
+  def abbreviate(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {
+      case (field: FieldDescriptor, string: String)
+          if field.getJavaType == FieldDescriptor.JavaType.STRING && string != null =>
+        val size = string.size
+        if (size > MAX_STRING_SIZE) {
+          builder.setField(field, createString(string.take(MAX_STRING_SIZE), size))
+        } else {
+          builder.setField(field, string)
+        }
+
+      case (field: FieldDescriptor, byteString: ByteString)
+          if field.getJavaType == FieldDescriptor.JavaType.BYTE_STRING && byteString != null =>
+        val size = byteString.size
+        if (size > MAX_BYTES_SIZE) {
+          val prefix = Array.tabulate(MAX_BYTES_SIZE)(byteString.byteAt)
+          builder.setField(field, createByteString(prefix, size))
+        } else {
+          builder.setField(field, byteString)
+        }
+
+      case (field: FieldDescriptor, byteArray: Array[Byte])
+          if field.getJavaType == FieldDescriptor.JavaType.BYTE_STRING && byteArray != null =>
+        val size = byteArray.size
+        if (size > MAX_BYTES_SIZE) {
+          val prefix = byteArray.take(MAX_BYTES_SIZE)
+          builder.setField(field, createByteString(prefix, size))
+        } else {
+          builder.setField(field, byteArray)
+        }
+
+      // TODO: should also support 1, repeated msg; 2, map<xxx, msg>

Review Comment:
   Can we file a JIRA, or remove `TODO`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40750: [WIP][CONNECT] Redact the proto message

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163731647


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+
+  def redact(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {

Review Comment:
   ok .. I think it was https://github.com/apache/spark/pull/28610 that is slightly different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40750: [WIP][CONNECT] Redact the proto message

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163705231


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -52,19 +52,15 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
     session.withActive {
 
       // Add debug information to the query execution so that the jobs are traceable.
-      try {

Review Comment:
   I removed this try-catch for test purpose (A PyTorch UT failed due to OOM before), will add it back to be more conservative



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40750: [SPARK-43105][CONNECT] Abbreviate Bytes and Strings in proto message

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1164942173


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+  private val MAX_BYTES_SIZE = 8
+  private val MAX_STRING_SIZE = 1024
+
+  def abbreviate(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {
+      case (field: FieldDescriptor, string: String)
+          if field.getJavaType == FieldDescriptor.JavaType.STRING && string != null =>
+        val size = string.size
+        if (size > MAX_STRING_SIZE) {
+          builder.setField(field, createString(string.take(MAX_STRING_SIZE), size))
+        } else {
+          builder.setField(field, string)
+        }
+
+      case (field: FieldDescriptor, byteString: ByteString)
+          if field.getJavaType == FieldDescriptor.JavaType.BYTE_STRING && byteString != null =>
+        val size = byteString.size
+        if (size > MAX_BYTES_SIZE) {
+          val prefix = Array.tabulate(MAX_BYTES_SIZE)(byteString.byteAt)
+          builder.setField(field, createByteString(prefix, size))
+        } else {
+          builder.setField(field, byteString)
+        }
+
+      case (field: FieldDescriptor, byteArray: Array[Byte])
+          if field.getJavaType == FieldDescriptor.JavaType.BYTE_STRING && byteArray != null =>
+        val size = byteArray.size
+        if (size > MAX_BYTES_SIZE) {
+          val prefix = byteArray.take(MAX_BYTES_SIZE)
+          builder.setField(field, createByteString(prefix, size))
+        } else {
+          builder.setField(field, byteArray)
+        }
+
+      // TODO: should also support 1, repeated msg; 2, map<xxx, msg>

Review Comment:
   ok, let me open a new ticket



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40750: [WIP][CONNECT] Redact the proto message

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163705231


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -52,19 +52,15 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
     session.withActive {
 
       // Add debug information to the query execution so that the jobs are traceable.
-      try {

Review Comment:
   I removed this try-catch for test purpose, will add it back to be more conservative



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40750: [WIP][CONNECT] Redact the proto message

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163704127


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+
+  def redact(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {

Review Comment:
   > Another way is just to show the first few bytes, and add .... e.g., "\377\377 ...".
   
   Sound good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40750: [WIP][CONNECT] Redact the proto message

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163514889


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+
+  def redact(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {

Review Comment:
   @HyukjinKwon  just FYI, i also quick investigated the redaction yesterday
   but not sure whether it is the proper way :joy:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40750: [SPARK-43105][CONNECT] Abbreviate Bytes in proto message's debug string

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163894169


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+  private val NUM_FIRST_BYTES = 8
+
+  def abbreviateBytes(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {
+      case (field: FieldDescriptor, byteString: ByteString)
+          if field.getJavaType == FieldDescriptor.JavaType.BYTE_STRING && byteString != null =>
+        val size = byteString.size()
+        if (size > NUM_FIRST_BYTES) {
+          val bytes = Array.ofDim[Byte](NUM_FIRST_BYTES)
+          var i = 0
+          while (i < NUM_FIRST_BYTES) {
+            bytes(i) = byteString.byteAt(i)
+            i += 1
+          }
+          builder.setField(field, createByteString(Some(bytes), size))

Review Comment:
   The `ByteString` doesn't provide a slicing or view method, so I think we have to copy.
   But we just copy a few (8 here) bytes, so should be fine



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40750: [WIP][CONNECT] Redact the proto message

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163704903


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+
+  def redact(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {

Review Comment:
   👌



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #40750: [SPARK-43105][CONNECT] Abbreviate Bytes in proto message's debug string

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163865340


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+  private val NUM_FIRST_BYTES = 8
+
+  def abbreviateBytes(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {
+      case (field: FieldDescriptor, byteString: ByteString)
+          if field.getJavaType == FieldDescriptor.JavaType.BYTE_STRING && byteString != null =>
+        val size = byteString.size()
+        if (size > NUM_FIRST_BYTES) {
+          val bytes = Array.ofDim[Byte](NUM_FIRST_BYTES)
+          var i = 0
+          while (i < NUM_FIRST_BYTES) {
+            bytes(i) = byteString.byteAt(i)
+            i += 1
+          }
+          builder.setField(field, createByteString(Some(bytes), size))
+        } else {
+          builder.setField(field, createByteString(None, size))

Review Comment:
   I'm confused about this logic, assume a short string with `size < NUM_FIRST_BYTES` this goes into the `else` branch and now `createByteString` will return `********(redacted, size=23)` is this really expected? Shouldn't this just show the short string instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40750: [SPARK-43105][CONNECT] Abbreviate Bytes in proto message's debug string

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163977624


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+  private val NUM_FIRST_BYTES = 8
+
+  def abbreviateBytes(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {
+      case (field: FieldDescriptor, byteString: ByteString)
+          if field.getJavaType == FieldDescriptor.JavaType.BYTE_STRING && byteString != null =>
+        val size = byteString.size()
+        if (size > NUM_FIRST_BYTES) {
+          val bytes = Array.ofDim[Byte](NUM_FIRST_BYTES)
+          var i = 0
+          while (i < NUM_FIRST_BYTES) {
+            bytes(i) = byteString.byteAt(i)
+            i += 1
+          }
+          builder.setField(field, createByteString(Some(bytes), size))
+        } else {
+          builder.setField(field, createByteString(None, size))

Review Comment:
   ok, will just show the original short string in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #40750: [SPARK-43105][CONNECT] Abbreviate Bytes and Strings in proto message

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #40750:
URL: https://github.com/apache/spark/pull/40750#issuecomment-1507780590

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40750: [WIP][CONNECT] Redact the proto message

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163700008


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+
+  def redact(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {

Review Comment:
   actually, I had tried to manually implement the `Message.toString` method but found some private classes (e.g. `TextGenerator`) are needed.
   In `com.google.protobuf.TextFormat#printFieldValue` method, I found the value for BYTES can be `ByteString` or `byte[]`:
   
   ```
           case BYTES:
             generator.print("\"");
             if (value instanceof ByteString) {
               generator.print(escapeBytes((ByteString) value));
             } else {
               generator.print(escapeBytes((byte[]) value));
             }
             generator.print("\"");
             break;
   ```
   
   The `Array[_]` here is to match the `Array[Byte]`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40750: [SPARK-43105][CONNECT] Abbreviate Bytes in proto message's debug string

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163838955


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -52,19 +52,15 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
     session.withActive {
 
       // Add debug information to the query execution so that the jobs are traceable.
-      try {

Review Comment:
   yes, I added it for that PyTorch test case, in which the size of UDF is 47mb and cause OOM
   
   but I am not very sure whether there are some unknown edge cases that can also cause failure, so I personally prefer adding the try-catch back before merge it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40750: [SPARK-43105][CONNECT] Abbreviate Bytes and Strings in proto message

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1164797549


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+  private val MAX_BYTES_SIZE = 8
+  private val MAX_STRING_SIZE = 1024
+
+  def abbreviate(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {
+      case (field: FieldDescriptor, string: String)
+          if field.getJavaType == FieldDescriptor.JavaType.STRING && string != null =>
+        val size = string.size
+        if (size > MAX_STRING_SIZE) {
+          builder.setField(field, createString(string.take(MAX_STRING_SIZE), size))
+        } else {
+          builder.setField(field, string)
+        }
+
+      case (field: FieldDescriptor, byteString: ByteString)
+          if field.getJavaType == FieldDescriptor.JavaType.BYTE_STRING && byteString != null =>
+        val size = byteString.size
+        if (size > MAX_BYTES_SIZE) {
+          val prefix = Array.tabulate(MAX_BYTES_SIZE)(byteString.byteAt)
+          builder.setField(field, createByteString(prefix, size))
+        } else {
+          builder.setField(field, byteString)
+        }
+
+      case (field: FieldDescriptor, byteArray: Array[Byte])
+          if field.getJavaType == FieldDescriptor.JavaType.BYTE_STRING && byteArray != null =>
+        val size = byteArray.size
+        if (size > MAX_BYTES_SIZE) {
+          val prefix = byteArray.take(MAX_BYTES_SIZE)
+          builder.setField(field, createByteString(prefix, size))
+        } else {
+          builder.setField(field, byteArray)
+        }
+
+      // TODO: should also support 1, repeated msg; 2, map<xxx, msg>
+      case (field: FieldDescriptor, msg: Message)
+          if field.getJavaType == FieldDescriptor.JavaType.MESSAGE && msg != null =>
+        builder.setField(field, abbreviate(msg))
+
+      case (field: FieldDescriptor, value: Any) => builder.setField(field, value)
+    }
+
+    builder.build()
+  }
+
+  private def createByteString(prefix: Array[Byte], size: Int): ByteString = {
+    ByteString.copyFrom(
+      List(
+        ByteString.copyFrom(prefix),
+        ByteString.copyFromUtf8(s"*********(redacted, size=${format.format(size)})")).asJava)

Review Comment:
   Yeah, let's just focus on abbreviating instead of redacting. This code path would likely have to change before 4.0 for better UI in any event.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #40750: [SPARK-43105][CONNECT] Abbreviate Bytes and Strings in proto message

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon closed pull request #40750: [SPARK-43105][CONNECT] Abbreviate Bytes and Strings in proto message
URL: https://github.com/apache/spark/pull/40750


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #40750: [SPARK-43105][CONNECT] Abbreviate Bytes in proto message's debug string

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163860871


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+  private val NUM_FIRST_BYTES = 8
+
+  def abbreviateBytes(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {
+      case (field: FieldDescriptor, byteString: ByteString)
+          if field.getJavaType == FieldDescriptor.JavaType.BYTE_STRING && byteString != null =>
+        val size = byteString.size()
+        if (size > NUM_FIRST_BYTES) {
+          val bytes = Array.ofDim[Byte](NUM_FIRST_BYTES)
+          var i = 0
+          while (i < NUM_FIRST_BYTES) {
+            bytes(i) = byteString.byteAt(i)
+            i += 1
+          }
+          builder.setField(field, createByteString(Some(bytes), size))

Review Comment:
   This creates at least one additional copy of the string that we might be able to reduce by passing the bytestring directly into the createByteString method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40750: [SPARK-43105][CONNECT] Abbreviate Bytes in proto message's debug string

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163894481


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+  private val NUM_FIRST_BYTES = 8
+
+  def abbreviateBytes(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {
+      case (field: FieldDescriptor, byteString: ByteString)

Review Comment:
   make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40750: [WIP][CONNECT] Redact the proto message

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163686435


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+
+  def redact(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {

Review Comment:
   This was actually almost exactly same approach I thought when I took a quick look this morning - I wasn't also sure about how to represent these values.
   
   Doing `s"bytes (size=${format.format(bytes.size)})"` seems good enough. BTW, when can this be `Array[_]`? I thought `bytes` always returns `ByteString`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #40750: [WIP][CONNECT] Redact the proto message

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163711978


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+
+  def redact(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {

Review Comment:
   @HyukjinKwon Spark UI redacts key-value pairs as `*********(redacted)` if the key matches certain pattern `(?i)secret|password|token|access[.]key`:
   https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/Utils.scala#L2827



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40750: [SPARK-43105][CONNECT] Abbreviate Bytes in proto message's debug string

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163891735


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+  private val NUM_FIRST_BYTES = 8
+
+  def abbreviateBytes(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {
+      case (field: FieldDescriptor, byteString: ByteString)
+          if field.getJavaType == FieldDescriptor.JavaType.BYTE_STRING && byteString != null =>
+        val size = byteString.size()
+        if (size > NUM_FIRST_BYTES) {
+          val bytes = Array.ofDim[Byte](NUM_FIRST_BYTES)
+          var i = 0
+          while (i < NUM_FIRST_BYTES) {
+            bytes(i) = byteString.byteAt(i)
+            i += 1
+          }
+          builder.setField(field, createByteString(Some(bytes), size))
+        } else {
+          builder.setField(field, createByteString(None, size))

Review Comment:
   it is to avoid showing all the raw data in `LocalRelation`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #40750: [SPARK-43105][CONNECT] Abbreviate Bytes in proto message's debug string

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163859269


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+  private val NUM_FIRST_BYTES = 8
+
+  def abbreviateBytes(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {
+      case (field: FieldDescriptor, byteString: ByteString)

Review Comment:
   I think it would be great if we do the same for very long regular strings. Another example might be that someone submits a 4MB SQL query and it would be good to abbreviate that as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #40750: [SPARK-43105][CONNECT] Abbreviate Bytes in proto message's debug string

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #40750:
URL: https://github.com/apache/spark/pull/40750#issuecomment-1505080460

   ok, on second thought, I think we should narrow this PR to abbreviation only.
   
   I think we can support redaction in this approach in the future:
   
   ```
   {
   ...
         case (field: FieldDescriptor, relation: proto.LocalRelation)
             if field.getJavaType == FieldDescriptor.JavaType.MESSAGE && relation != null =>
           builder.setField(field, redactLocalRelation(relation))
   ...
   }
   
   private def redactLocalRelation(relation: proto.LocalRelation): proto.LocalRelation = {
   
   ....
   }
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #40750: [SPARK-43105][CONNECT] Abbreviate Bytes and Strings in proto message

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1164087798


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+  private val MAX_BYTES_SIZE = 8
+  private val MAX_STRING_SIZE = 1024
+
+  def abbreviate(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {
+      case (field: FieldDescriptor, string: String)
+          if field.getJavaType == FieldDescriptor.JavaType.STRING && string != null =>
+        val size = string.size
+        if (size > MAX_STRING_SIZE) {
+          builder.setField(field, createString(string.take(MAX_STRING_SIZE), size))
+        } else {
+          builder.setField(field, string)
+        }
+
+      case (field: FieldDescriptor, byteString: ByteString)
+          if field.getJavaType == FieldDescriptor.JavaType.BYTE_STRING && byteString != null =>
+        val size = byteString.size
+        if (size > MAX_BYTES_SIZE) {
+          val prefix = Array.tabulate(MAX_BYTES_SIZE)(byteString.byteAt)
+          builder.setField(field, createByteString(prefix, size))
+        } else {
+          builder.setField(field, byteString)
+        }
+
+      case (field: FieldDescriptor, byteArray: Array[Byte])
+          if field.getJavaType == FieldDescriptor.JavaType.BYTE_STRING && byteArray != null =>
+        val size = byteArray.size
+        if (size > MAX_BYTES_SIZE) {
+          val prefix = byteArray.take(MAX_BYTES_SIZE)
+          builder.setField(field, createByteString(prefix, size))
+        } else {
+          builder.setField(field, byteArray)
+        }
+
+      // TODO: should also support 1, repeated msg; 2, map<xxx, msg>
+      case (field: FieldDescriptor, msg: Message)
+          if field.getJavaType == FieldDescriptor.JavaType.MESSAGE && msg != null =>
+        builder.setField(field, abbreviate(msg))
+
+      case (field: FieldDescriptor, value: Any) => builder.setField(field, value)
+    }
+
+    builder.build()
+  }
+
+  private def createByteString(prefix: Array[Byte], size: Int): ByteString = {
+    ByteString.copyFrom(
+      List(
+        ByteString.copyFrom(prefix),
+        ByteString.copyFromUtf8(s"*********(redacted, size=${format.format(size)})")).asJava)
+  }
+
+  private def createString(prefix: String, size: Int): String = {
+    s"$prefix*********(redacted, size=${format.format(size)})"

Review Comment:
   Similar here
   ```
   "$prefix[truncated(size=XXX)]"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #40750: [SPARK-43105][CONNECT] Abbreviate Bytes and Strings in proto message

Posted by "grundprinzip (via GitHub)" <gi...@apache.org>.
grundprinzip commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1164087423


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+  private val MAX_BYTES_SIZE = 8
+  private val MAX_STRING_SIZE = 1024
+
+  def abbreviate(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {
+      case (field: FieldDescriptor, string: String)
+          if field.getJavaType == FieldDescriptor.JavaType.STRING && string != null =>
+        val size = string.size
+        if (size > MAX_STRING_SIZE) {
+          builder.setField(field, createString(string.take(MAX_STRING_SIZE), size))
+        } else {
+          builder.setField(field, string)
+        }
+
+      case (field: FieldDescriptor, byteString: ByteString)
+          if field.getJavaType == FieldDescriptor.JavaType.BYTE_STRING && byteString != null =>
+        val size = byteString.size
+        if (size > MAX_BYTES_SIZE) {
+          val prefix = Array.tabulate(MAX_BYTES_SIZE)(byteString.byteAt)
+          builder.setField(field, createByteString(prefix, size))
+        } else {
+          builder.setField(field, byteString)
+        }
+
+      case (field: FieldDescriptor, byteArray: Array[Byte])
+          if field.getJavaType == FieldDescriptor.JavaType.BYTE_STRING && byteArray != null =>
+        val size = byteArray.size
+        if (size > MAX_BYTES_SIZE) {
+          val prefix = byteArray.take(MAX_BYTES_SIZE)
+          builder.setField(field, createByteString(prefix, size))
+        } else {
+          builder.setField(field, byteArray)
+        }
+
+      // TODO: should also support 1, repeated msg; 2, map<xxx, msg>
+      case (field: FieldDescriptor, msg: Message)
+          if field.getJavaType == FieldDescriptor.JavaType.MESSAGE && msg != null =>
+        builder.setField(field, abbreviate(msg))
+
+      case (field: FieldDescriptor, value: Any) => builder.setField(field, value)
+    }
+
+    builder.build()
+  }
+
+  private def createByteString(prefix: Array[Byte], size: Int): ByteString = {
+    ByteString.copyFrom(
+      List(
+        ByteString.copyFrom(prefix),
+        ByteString.copyFromUtf8(s"*********(redacted, size=${format.format(size)})")).asJava)

Review Comment:
   since this no longer "redacted", what about making the format string something like:
   
   ```
   "[truncated(size=XXX)]"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40750: [WIP][CONNECT] Redact the proto message

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163687291


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+
+  def redact(message: Message): Message = {

Review Comment:
   I think the method name should be like `abbreviateBytes`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40750: [WIP][CONNECT] Redact the proto message

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163705820


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -52,19 +52,15 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
     session.withActive {
 
       // Add debug information to the query execution so that the jobs are traceable.
-      try {

Review Comment:
   I think it's fine to remove it (since you added it to address this specific case before?). I don't mind removing it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40750: [SPARK-43105][CONNECT] Abbreviate Bytes in proto message's debug string

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1164010915


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+  private val NUM_FIRST_BYTES = 8
+
+  def abbreviateBytes(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {
+      case (field: FieldDescriptor, byteString: ByteString)

Review Comment:
   done
   
   ```
   In [1]: query = "SELECT" + " " * 8192 + "1"
   
   In [2]: len(query)
   Out[2]: 8199
   
   In [3]: spark.sql(query).show()
   +---+                                                                           
   |  1|
   +---+
   |  1|
   +---+
   ```
   
   ![image](https://user-images.githubusercontent.com/7322292/231446399-e92d3d29-1a82-4b2f-8cda-e1f66d0a2616.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40750: [WIP][CONNECT] Redact the proto message

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163729097


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+
+  def redact(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {

Review Comment:
   oh no. I meant the abbreviation when its too long.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40750: [SPARK-43105][CONNECT] Abbreviate Bytes and Strings in proto message

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1164900701


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+  private val MAX_BYTES_SIZE = 8
+  private val MAX_STRING_SIZE = 1024
+
+  def abbreviate(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {
+      case (field: FieldDescriptor, string: String)
+          if field.getJavaType == FieldDescriptor.JavaType.STRING && string != null =>
+        val size = string.size
+        if (size > MAX_STRING_SIZE) {
+          builder.setField(field, createString(string.take(MAX_STRING_SIZE), size))
+        } else {
+          builder.setField(field, string)
+        }
+
+      case (field: FieldDescriptor, byteString: ByteString)
+          if field.getJavaType == FieldDescriptor.JavaType.BYTE_STRING && byteString != null =>
+        val size = byteString.size
+        if (size > MAX_BYTES_SIZE) {
+          val prefix = Array.tabulate(MAX_BYTES_SIZE)(byteString.byteAt)
+          builder.setField(field, createByteString(prefix, size))
+        } else {
+          builder.setField(field, byteString)
+        }
+
+      case (field: FieldDescriptor, byteArray: Array[Byte])
+          if field.getJavaType == FieldDescriptor.JavaType.BYTE_STRING && byteArray != null =>
+        val size = byteArray.size
+        if (size > MAX_BYTES_SIZE) {
+          val prefix = byteArray.take(MAX_BYTES_SIZE)
+          builder.setField(field, createByteString(prefix, size))
+        } else {
+          builder.setField(field, byteArray)
+        }
+
+      // TODO: should also support 1, repeated msg; 2, map<xxx, msg>
+      case (field: FieldDescriptor, msg: Message)
+          if field.getJavaType == FieldDescriptor.JavaType.MESSAGE && msg != null =>
+        builder.setField(field, abbreviate(msg))
+
+      case (field: FieldDescriptor, value: Any) => builder.setField(field, value)
+    }
+
+    builder.build()
+  }
+
+  private def createByteString(prefix: Array[Byte], size: Int): ByteString = {
+    ByteString.copyFrom(
+      List(
+        ByteString.copyFrom(prefix),
+        ByteString.copyFromUtf8(s"*********(redacted, size=${format.format(size)})")).asJava)
+  }
+
+  private def createString(prefix: String, size: Int): String = {
+    s"$prefix*********(redacted, size=${format.format(size)})"

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40750: [WIP][CONNECT] Redact the proto message

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163688962


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+
+  def redact(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {

Review Comment:
   Another way is just to show the first few bytes, and add `...`. e.g., `"\377\377 ...". @gengliangwang do you have some idea about this abbreviation? I remember we did similar thing somewhere in UI before when we handle some events.



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+
+  def redact(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {

Review Comment:
   Another way is just to show the first few bytes, and add `...`. e.g., `"\377\377 ..."`. @gengliangwang do you have some idea about this abbreviation? I remember we did similar thing somewhere in UI before when we handle some events.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40750: [WIP][CONNECT] Redact the proto message

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163701889


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+
+  def redact(message: Message): Message = {

Review Comment:
   we can also change other fields in this method via pattern matching, but right now it only abbreviate the bytes fields, so `abbreviateBytes`  LGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #40750: [SPARK-43105][CONNECT] Abbreviate Bytes in proto message's debug string

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on PR #40750:
URL: https://github.com/apache/spark/pull/40750#issuecomment-1504922648

   also cc @grundprinzip 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40750: [SPARK-43105][CONNECT] Abbreviate Bytes in proto message's debug string

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1163838955


##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -52,19 +52,15 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[ExecutePlanResp
     session.withActive {
 
       // Add debug information to the query execution so that the jobs are traceable.
-      try {

Review Comment:
   yes, I added it for that PyTorch test case, in which the size of UDF is 47mb and cause OOM
   
   but I am not very sure whether there are some other unknown edge cases that can also cause failure, so I personally prefer adding the try-catch back before merge it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #40750: [SPARK-43105][CONNECT] Abbreviate Bytes and Strings in proto message

Posted by "zhengruifeng (via GitHub)" <gi...@apache.org>.
zhengruifeng commented on code in PR #40750:
URL: https://github.com/apache/spark/pull/40750#discussion_r1164803678


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/ProtoUtils.scala:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connect.common
+
+import scala.collection.JavaConverters._
+
+import com.google.protobuf.{ByteString, Message}
+import com.google.protobuf.Descriptors.FieldDescriptor
+
+private[connect] object ProtoUtils {
+  private val format = java.text.NumberFormat.getInstance()
+  private val MAX_BYTES_SIZE = 8
+  private val MAX_STRING_SIZE = 1024
+
+  def abbreviate(message: Message): Message = {
+    val builder = message.toBuilder
+
+    message.getAllFields.asScala.iterator.foreach {
+      case (field: FieldDescriptor, string: String)
+          if field.getJavaType == FieldDescriptor.JavaType.STRING && string != null =>
+        val size = string.size
+        if (size > MAX_STRING_SIZE) {
+          builder.setField(field, createString(string.take(MAX_STRING_SIZE), size))
+        } else {
+          builder.setField(field, string)
+        }
+
+      case (field: FieldDescriptor, byteString: ByteString)
+          if field.getJavaType == FieldDescriptor.JavaType.BYTE_STRING && byteString != null =>
+        val size = byteString.size
+        if (size > MAX_BYTES_SIZE) {
+          val prefix = Array.tabulate(MAX_BYTES_SIZE)(byteString.byteAt)
+          builder.setField(field, createByteString(prefix, size))
+        } else {
+          builder.setField(field, byteString)
+        }
+
+      case (field: FieldDescriptor, byteArray: Array[Byte])
+          if field.getJavaType == FieldDescriptor.JavaType.BYTE_STRING && byteArray != null =>
+        val size = byteArray.size
+        if (size > MAX_BYTES_SIZE) {
+          val prefix = byteArray.take(MAX_BYTES_SIZE)
+          builder.setField(field, createByteString(prefix, size))
+        } else {
+          builder.setField(field, byteArray)
+        }
+
+      // TODO: should also support 1, repeated msg; 2, map<xxx, msg>
+      case (field: FieldDescriptor, msg: Message)
+          if field.getJavaType == FieldDescriptor.JavaType.MESSAGE && msg != null =>
+        builder.setField(field, abbreviate(msg))
+
+      case (field: FieldDescriptor, value: Any) => builder.setField(field, value)
+    }
+
+    builder.build()
+  }
+
+  private def createByteString(prefix: Array[Byte], size: Int): ByteString = {
+    ByteString.copyFrom(
+      List(
+        ByteString.copyFrom(prefix),
+        ByteString.copyFromUtf8(s"*********(redacted, size=${format.format(size)})")).asJava)

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org