You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2023/03/01 21:30:39 UTC

[kudu] branch branch-1.17.x updated: KUDU-3450 handling of oversized messages in Ranger process wrapper

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch branch-1.17.x
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/branch-1.17.x by this push:
     new 299acb8dc KUDU-3450 handling of oversized messages in Ranger process wrapper
299acb8dc is described below

commit 299acb8dca77579b4fcc7b807f9e726c24a91386
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Sat Feb 18 08:34:19 2023 -0800

    KUDU-3450 handling of oversized messages in Ranger process wrapper
    
    This patch updates the Ranger client process wrapper to handle
    oversized messages.  With this patch, upon receiving an oversized
    message, the wrapper discards the message and logs about the error.
    The input stream is ready to receive next messages after that.
    A corresponding unit test for MessageIO.readBytes() is added as well.
    
    In addition, now the --subprocess_max_message_size_bytes master's flag
    controls the maximum message size at the Ranger client process wrapper
    as well.  SubprocessConfiguration.MAX_MESSAGE_BYTES_DEFAULT has been
    updated to match the default value of the flag.
    
    This is a follow-up to ae22d32ef5895a02a763665cfd2812aa61be5ab3.
    
    Change-Id: I9fe57bdeeb8a5515578e3feb7cacbbb0d7692e3e
    Reviewed-on: http://gerrit.cloudera.org:8080/19516
    Tested-by: Alexey Serbin <al...@apache.org>
    Reviewed-by: Attila Bukor <ab...@apache.org>
    (cherry picked from commit 7b593aedaded2f01fbf8a6094524e6012ad542e7)
    Reviewed-on: http://gerrit.cloudera.org:8080/19563
    Reviewed-by: Yingchun Lai <la...@apache.org>
    Tested-by: Kudu Jenkins
---
 .../java/org/apache/kudu/subprocess/MessageIO.java | 38 +++++++++++++--
 .../org/apache/kudu/subprocess/MessageReader.java  |  3 ++
 .../kudu/subprocess/SubprocessConfiguration.java   |  2 +-
 .../org/apache/kudu/subprocess/TestMessageIO.java  | 55 ++++++++++++++--------
 src/kudu/ranger/ranger_client.cc                   |  3 ++
 5 files changed, 77 insertions(+), 24 deletions(-)

diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageIO.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageIO.java
index bcb15f906..538904661 100644
--- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageIO.java
+++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageIO.java
@@ -58,6 +58,8 @@ public class MessageIO {
    * @throws IOException if this input stream has been closed, an I/O
    *                     error occurs, or fail to read the message
    *                     properly
+   * @throws KuduSubprocessException if there was an oversized message
+   *                                 in the stream
    */
   @VisibleForTesting
   byte[] readBytes() throws EOFException, IOException {
@@ -67,9 +69,12 @@ public class MessageIO {
     doRead(sizeBytes, Integer.BYTES);
     int size = bytesToInt(sizeBytes);
     if (size > maxMessageBytes) {
-      throw new IOException(
-          String.format("message size (%d) exceeds maximum message size (%d)",
-                        size, maxMessageBytes));
+      // Read out and discard the oversized message, so the channel is available
+      // for further communication.
+      doReadAndDiscard(size);
+      throw new KuduSubprocessException(String.format(
+          "message size (%d) exceeds maximum message size (%d): message is discarded",
+          size, maxMessageBytes));
     }
     // Read the body based on the size.
     byte[] dataBytes = new byte[size];
@@ -98,6 +103,33 @@ public class MessageIO {
     }
   }
 
+  /**
+   * Reads <code>size</code> bytes of data from the underlying buffered input
+   * stream and discards all the bytes read.
+   * If it fails to read the specified size, <code>IOException</code> is thrown.
+   *
+   * @throws EOFException if the end of the stream has been reached
+   * @throws IOException if this input stream has been closed, an I/O
+   *                     error occurs, or fail to read the specified size
+   */
+  private void doReadAndDiscard(int size) throws EOFException, IOException {
+    byte[] buf = new byte[4096];
+    int rem = size;
+    while (rem > 0) {
+      int toRead = Math.min(4096, rem);
+      int read = in.read(buf, 0, toRead);
+      if (read == -1) {
+        throw new EOFException(String.format("the end of the stream " +
+            "has been reached while reading out oversized message (%d bytes)", size));
+      } else if (read != toRead) {
+        throw new IOException(
+            String.format("unable to read next chunk of oversized message (%d bytes), " +
+                "expected %d bytes but read %d bytes", size, toRead, read));
+      }
+      rem -= read;
+    }
+  }
+
   /**
    * Writes a protobuf message to the buffered output stream. Since we flush
    * after writing each message, with the underlying buffer size being the
diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java
index 06ad4b807..bc0878ef6 100644
--- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java
+++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/MessageReader.java
@@ -66,6 +66,9 @@ class MessageReader implements Runnable {
       byte[] data;
       try {
         data = messageIO.readBytes();
+      } catch (KuduSubprocessException e) {
+        LOG.error("%s: continuing", e.getMessage());
+        continue;
       } catch (EOFException e) {
         LOG.info("Reaching the end of the input stream, exiting.");
         // Break the loop if the end of the stream has been reached.
diff --git a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java
index e80f3ba2a..17d6dfa39 100644
--- a/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java
+++ b/java/kudu-subprocess/src/main/java/org/apache/kudu/subprocess/SubprocessConfiguration.java
@@ -51,7 +51,7 @@ public class SubprocessConfiguration {
   private OutputStream outputStream;
 
   @VisibleForTesting
-  static final int MAX_MESSAGE_BYTES_DEFAULT = 1024 * 1024;
+  static final int MAX_MESSAGE_BYTES_DEFAULT = 8 * 1024 * 1024;
 
   public SubprocessConfiguration(String[] args) {
     parse(args);
diff --git a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageIO.java b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageIO.java
index 98fc0b04c..2c55cb88d 100644
--- a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageIO.java
+++ b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/TestMessageIO.java
@@ -97,16 +97,16 @@ public class TestMessageIO {
   }
 
   /**
-   * Verifies that reading malformed message that exceeds the maximum
-   * bytes size should cause expected error.
+   * Verifies that reading malformed messages that has mismatched size
+   * and body (not enough data in the body) should cause expected error.
    */
   @Test
-  public void testMalformedMessageExceedMaxBytes() {
-    byte[] size = MessageIO.intToBytes(SubprocessConfiguration.MAX_MESSAGE_BYTES_DEFAULT + 1);
-    byte[] body = new byte[0];
+  public void testMalformedMessageMismatchSize() {
+    byte[] size = MessageIO.intToBytes(100);
+    byte[] body = new byte[10];
+    Arrays.fill(body, (byte)0);
     byte[] malformedMessage = Bytes.concat(size, body);
-    ByteArrayInputStream byteInputStream = new ByteArrayInputStream(malformedMessage);
-    BufferedInputStream in = new BufferedInputStream(byteInputStream);
+    BufferedInputStream in = new BufferedInputStream(new ByteArrayInputStream(malformedMessage));
     MessageIO messageIO = new MessageIO(SubprocessConfiguration.MAX_MESSAGE_BYTES_DEFAULT,
                                         in, /* out= */null);
     Throwable thrown = Assert.assertThrows(IOException.class, new ThrowingRunnable() {
@@ -115,28 +115,43 @@ public class TestMessageIO {
         messageIO.readBytes();
       }
     });
-    Assert.assertTrue(thrown.getMessage().contains("exceeds maximum message size"));
+    Assert.assertTrue(thrown.getMessage().contains("unable to receive message"));
   }
 
   /**
-   * Verifies that reading malformed messages that has mismatched size
-   * and body (not enough data in the body) should cause expected error.
+   * Verify that KuduSubprocessException is thrown by MessageIO.readBytes() when
+   * an oversized message is detected in the input stream. After the oversized
+   * message is read and discarded, next message can be read from the stream.
    */
   @Test
-  public void testMalformedMessageMismatchSize() {
-    byte[] size = MessageIO.intToBytes(100);
-    byte[] body = new byte[10];
-    Arrays.fill(body, (byte)0);
-    byte[] malformedMessage = Bytes.concat(size, body);
-    BufferedInputStream in = new BufferedInputStream(new ByteArrayInputStream(malformedMessage));
-    MessageIO messageIO = new MessageIO(SubprocessConfiguration.MAX_MESSAGE_BYTES_DEFAULT,
-                                        in, /* out= */null);
-    Throwable thrown = Assert.assertThrows(IOException.class, new ThrowingRunnable() {
+  public void testOversizedMessage() throws Exception {
+    final int maxMessageSize = 32;
+    byte[] size0 = MessageIO.intToBytes(maxMessageSize + 1);
+    byte[] body0 = new byte[maxMessageSize + 1];
+    Arrays.fill(body0, (byte) 0);
+    byte[] msg0 = Bytes.concat(size0, body0);
+
+    byte[] size1 = MessageIO.intToBytes(maxMessageSize);
+    byte[] body1 = new byte[maxMessageSize];
+    Arrays.fill(body1, (byte) 1);
+    byte[] msg1 = Bytes.concat(size1, body1);
+
+    byte[] msg = Bytes.concat(msg0, msg1);
+
+    BufferedInputStream in = new BufferedInputStream(new ByteArrayInputStream(msg));
+
+    MessageIO messageIO = new MessageIO(maxMessageSize, in, /* out= */null);
+    Throwable thrown = Assert.assertThrows(KuduSubprocessException.class, new ThrowingRunnable() {
       @Override
       public void run() throws Exception {
         messageIO.readBytes();
       }
     });
-    Assert.assertTrue(thrown.getMessage().contains("unable to receive message"));
+    Assert.assertTrue(thrown.getMessage().contains(
+        "message size (33) exceeds maximum message size (32): message is discarded"));
+
+    byte[] readMsg = messageIO.readBytes();
+    Assert.assertEquals(maxMessageSize, readMsg.length);
+    Assert.assertArrayEquals(body1, readMsg);
   }
 }
diff --git a/src/kudu/ranger/ranger_client.cc b/src/kudu/ranger/ranger_client.cc
index ef3bd6d62..3046358f4 100644
--- a/src/kudu/ranger/ranger_client.cc
+++ b/src/kudu/ranger/ranger_client.cc
@@ -100,6 +100,7 @@ TAG_FLAG(ranger_logtostdout, evolving);
 
 DECLARE_int32(max_log_files);
 DECLARE_uint32(max_log_size);
+DECLARE_uint32(subprocess_max_message_size_bytes);
 DECLARE_string(log_dir);
 
 METRIC_DEFINE_histogram(server, ranger_subprocess_execution_time_ms,
@@ -364,6 +365,8 @@ Status BuildArgv(const string& fifo_path, const string& log_properties_path,
     ret.emplace_back("-k");
     ret.emplace_back(FLAGS_keytab_file);
   }
+  ret.emplace_back("-m");
+  ret.emplace_back(std::to_string(FLAGS_subprocess_max_message_size_bytes));
   ret.emplace_back("-o");
   ret.emplace_back(fifo_path);
   *argv = std::move(ret);