You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2022/10/26 10:16:42 UTC

[ozone] branch master updated: HDDS-7342. Move encryption-related code from MultipartCryptoKeyInputStream to OzoneCryptoInputStream (#3852)

This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new a664ccac6a HDDS-7342. Move encryption-related code from MultipartCryptoKeyInputStream to OzoneCryptoInputStream (#3852)
a664ccac6a is described below

commit a664ccac6a52bd01619ebb29a76f8c171a71f1cf
Author: Cyrill <cy...@gmail.com>
AuthorDate: Wed Oct 26 13:16:35 2022 +0300

    HDDS-7342. Move encryption-related code from MultipartCryptoKeyInputStream to OzoneCryptoInputStream (#3852)
---
 .../client/io/MultipartCryptoKeyInputStream.java   | 135 +-----------------
 .../ozone/client/io/OzoneCryptoInputStream.java    | 152 ++++++++++++++++++++-
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |   6 +-
 3 files changed, 156 insertions(+), 137 deletions(-)

diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/MultipartCryptoKeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/MultipartCryptoKeyInputStream.java
index 530084e0f3..c7fc21cbb2 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/MultipartCryptoKeyInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/MultipartCryptoKeyInputStream.java
@@ -65,17 +65,6 @@ public class MultipartCryptoKeyInputStream extends OzoneInputStream
   // can be reset if a new position is seeked.
   private int prevPartIndex = 0;
 
-  // If a read's start/ length position doesn't coincide with a Crypto buffer
-  // boundary, it will be adjusted as reads should happen only at the buffer
-  // boundaries for decryption to happen correctly. In this case, after the
-  // data has been read and decrypted, only the requested data should be
-  // returned to the client. readPositionAdjustedBy and readLengthAdjustedBy
-  // store these adjustment information. Before returning to client, the first
-  // readPositionAdjustedBy number of bytes and the last readLengthAdjustedBy
-  // number of bytes must be discarded.
-  private int readPositionAdjustedBy = 0;
-  private int readLengthAdjustedBy = 0;
-
   public MultipartCryptoKeyInputStream(String keyName,
       List<OzoneCryptoInputStream> inputStreams) {
 
@@ -130,65 +119,7 @@ public class MultipartCryptoKeyInputStream extends OzoneInputStream
 
       // Get the current partStream and read data from it
       OzoneCryptoInputStream current = partStreams.get(partIndex);
-      // CryptoInputStream reads hadoop.security.crypto.buffer.size number of
-      // bytes (default 8KB) at a time. This needs to be taken into account
-      // in calculating the numBytesToRead.
-      int numBytesToRead = getNumBytesToRead(len, (int)current.getRemaining(),
-          current.getBufferSize());
-      int numBytesRead;
-
-      if (readPositionAdjustedBy != 0 || readLengthAdjustedBy != 0) {
-        // There was some adjustment made in position and/ or length of data
-        // to be read to account for Crypto buffer boundary. Hence, read the
-        // data into a temp buffer and then copy only the requested data into
-        // clients buffer.
-        byte[] tempBuffer = new byte[numBytesToRead];
-        int actualNumBytesRead = current.read(tempBuffer, 0,
-            numBytesToRead);
-        numBytesRead = actualNumBytesRead - readPositionAdjustedBy -
-            readLengthAdjustedBy;
-
-        if (actualNumBytesRead != numBytesToRead) {
-          throw new IOException(String.format("Inconsistent read for key=%s " +
-                  "part=%d length=%d numBytesToRead(accounting for Crypto " +
-                  "boundaries)=%d numBytesRead(actual)=%d " +
-                  "numBytesToBeRead(into client buffer discarding crypto " +
-                  "boundary adjustments)=%d",
-              key, partIndex, current.getLength(), numBytesToRead,
-              actualNumBytesRead, numBytesRead));
-        }
-
-        // TODO: Byte array copies are not optimal. If there is a better and
-        //  more optimal solution to copy only a part of read data into
-        //  client buffer, this should be updated.
-        System.arraycopy(tempBuffer, readPositionAdjustedBy, b, off,
-            numBytesRead);
-
-        LOG.debug("OzoneCryptoInputStream for key: {} part: {} read {} bytes " +
-                "instead of {} bytes to account for Crypto buffer boundary. " +
-                "Client buffer will be copied with read data from position {}" +
-                "upto position {}, discarding the extra bytes read to " +
-                "maintain Crypto buffer boundary limits", key, partIndex,
-            actualNumBytesRead, numBytesRead, readPositionAdjustedBy,
-            actualNumBytesRead - readPositionAdjustedBy);
-
-        if (readLengthAdjustedBy > 0) {
-          current.seek(current.getPos() - readLengthAdjustedBy);
-        }
-
-        // Reset readPositionAdjustedBy and readLengthAdjustedBy
-        readPositionAdjustedBy = 0;
-        readLengthAdjustedBy = 0;
-      } else {
-        numBytesRead = current.read(b, off, numBytesToRead);
-        if (numBytesRead != numBytesToRead) {
-          throw new IOException(String.format("Inconsistent read for key=%s " +
-                  "part=%d length=%d numBytesToRead=%d numBytesRead=%d",
-              key, partIndex, current.getLength(), numBytesToRead,
-              numBytesRead));
-        }
-      }
-
+      int numBytesRead = current.read(b, off, len);
       totalReadLen += numBytesRead;
       off += numBytesRead;
       len -= numBytesRead;
@@ -202,70 +133,6 @@ public class MultipartCryptoKeyInputStream extends OzoneInputStream
     return totalReadLen;
   }
 
-  /**
-   * Get number of bytes to read from the current stream based on the length
-   * to be read, number of bytes remaining in the stream and the Crypto buffer
-   * size.
-   * Reads should be performed at the CryptoInputStream Buffer boundaries only.
-   * Otherwise, the decryption will be incorrect.
-   */
-  private int getNumBytesToRead(int lenToRead, int remaining,
-      int cryptoBufferSize) throws IOException {
-
-    Preconditions.checkArgument(readPositionAdjustedBy == 0);
-    Preconditions.checkArgument(readLengthAdjustedBy == 0);
-
-    // Check and adjust position if required
-    adjustReadPosition(cryptoBufferSize);
-    remaining += readPositionAdjustedBy;
-    lenToRead += readPositionAdjustedBy;
-
-    return adjustNumBytesToRead(lenToRead, remaining, cryptoBufferSize);
-  }
-
-  /**
-   * Reads should be performed at the CryptoInputStream Buffer boundary size.
-   * Otherwise, the decryption will be incorrect. Hence, if the position is
-   * not at the boundary limit, we have to adjust the position and might need
-   * to read more data than requested. The extra data will be filtered out
-   * before returning to the client.
-   */
-  private void adjustReadPosition(long cryptoBufferSize) throws IOException {
-    // Position of the buffer in current stream
-    long currentPosOfStream = partStreams.get(partIndex).getPos();
-    int modulus = (int) (currentPosOfStream % cryptoBufferSize);
-    if (modulus != 0) {
-      // Adjustment required.
-      // Update readPositionAdjustedBy and seek to the adjusted position
-      readPositionAdjustedBy = modulus;
-      // Seek current partStream to adjusted position. We do not need to
-      // reset the seeked positions of other streams.
-      partStreams.get(partIndex)
-          .seek(currentPosOfStream - readPositionAdjustedBy);
-      LOG.debug("OzoneCryptoInputStream for key: {} part: {} adjusted " +
-              "position {} by -{} to account for Crypto buffer boundary",
-          key, partIndex, currentPosOfStream, readPositionAdjustedBy);
-    }
-  }
-
-  /**
-   * If the length of data requested does not end at a Crypto Buffer
-   * boundary, the number of bytes to be read must be adjusted accordingly.
-   * The extra data will be filtered out before returning to the client.
-   */
-  private int adjustNumBytesToRead(int lenToRead, int remaining,
-      int cryptoBufferSize) {
-    int numBytesToRead = Math.min(cryptoBufferSize, remaining);
-    if (lenToRead < numBytesToRead) {
-      // Adjustment required; Update readLengthAdjustedBy.
-      readLengthAdjustedBy = numBytesToRead - lenToRead;
-      LOG.debug("OzoneCryptoInputStream for key: {} part: {} adjusted length " +
-              "by +{} to account for Crypto buffer boundary",
-          key, partIndex, readLengthAdjustedBy);
-    }
-    return numBytesToRead;
-  }
-
   /**
    * Seeks the InputStream to the specified position. This involves 2 steps:
    *    1. Updating the partIndex to the partStream corresponding to the
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneCryptoInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneCryptoInputStream.java
index 9d5d888688..1d0cb2bb82 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneCryptoInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneCryptoInputStream.java
@@ -19,10 +19,14 @@
 package org.apache.hadoop.ozone.client.io;
 
 import java.io.IOException;
+
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.crypto.CryptoCodec;
 import org.apache.hadoop.crypto.CryptoInputStream;
 import org.apache.hadoop.crypto.CryptoStreamUtils;
 import org.apache.hadoop.fs.Seekable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A CryptoInputStream for Ozone with length. This stream is used to read
@@ -31,16 +35,35 @@ import org.apache.hadoop.fs.Seekable;
 public class OzoneCryptoInputStream extends CryptoInputStream
     implements Seekable {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OzoneCryptoInputStream.class);
+
   private final long length;
   private final int bufferSize;
+  private final String keyName;
+  private final int partIndex;
+
+  // If a read's start/ length position doesn't coincide with a Crypto buffer
+  // boundary, it will be adjusted as reads should happen only at the buffer
+  // boundaries for decryption to happen correctly. In this case, after the
+  // data has been read and decrypted, only the requested data should be
+  // returned to the client. readPositionAdjustedBy and readLengthAdjustedBy
+  // store these adjustment information. Before returning to client, the first
+  // readPositionAdjustedBy number of bytes and the last readLengthAdjustedBy
+  // number of bytes must be discarded.
+  private int readPositionAdjustedBy = 0;
+  private int readLengthAdjustedBy = 0;
 
   public OzoneCryptoInputStream(LengthInputStream in,
-      CryptoCodec codec, byte[] key, byte[] iv) throws IOException {
+      CryptoCodec codec, byte[] key, byte[] iv,
+      String keyName, int partIndex) throws IOException {
     super(in.getWrappedStream(), codec, key, iv);
     this.length = in.getLength();
     // This is the buffer size used while creating the CryptoInputStream
     // internally
     this.bufferSize = CryptoStreamUtils.getBufferSize(codec.getConf());
+    this.keyName = keyName;
+    this.partIndex = partIndex;
   }
 
   public long getLength() {
@@ -54,4 +77,131 @@ public class OzoneCryptoInputStream extends CryptoInputStream
   public long getRemaining() throws IOException {
     return length - getPos();
   }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    // CryptoInputStream reads hadoop.security.crypto.buffer.size number of
+    // bytes (default 8KB) at a time. This needs to be taken into account
+    // in calculating the numBytesToRead.
+    int numBytesToRead = getNumBytesToRead(len, (int)getRemaining(),
+        getBufferSize());
+    int numBytesRead;
+
+    if (readPositionAdjustedBy != 0 || readLengthAdjustedBy != 0) {
+      // There was some adjustment made in position and/ or length of data
+      // to be read to account for Crypto buffer boundary. Hence, read the
+      // data into a temp buffer and then copy only the requested data into
+      // clients buffer.
+      byte[] tempBuffer = new byte[numBytesToRead];
+      int actualNumBytesRead = super.read(tempBuffer, 0,
+          numBytesToRead);
+      numBytesRead = actualNumBytesRead - readPositionAdjustedBy -
+          readLengthAdjustedBy;
+
+      if (actualNumBytesRead != numBytesToRead) {
+        throw new IOException(String.format("Inconsistent read for key=%s " +
+                "part=%d length=%d numBytesToRead(accounting for Crypto " +
+                "boundaries)=%d numBytesRead(actual)=%d " +
+                "numBytesToBeRead(into client buffer discarding crypto " +
+                "boundary adjustments)=%d",
+            keyName, partIndex, getLength(), numBytesToRead,
+            actualNumBytesRead, numBytesRead));
+      }
+
+      // TODO: Byte array copies are not optimal. If there is a better and
+      //  more optimal solution to copy only a part of read data into
+      //  client buffer, this should be updated.
+      System.arraycopy(tempBuffer, readPositionAdjustedBy, b, off,
+          numBytesRead);
+
+      LOG.debug("OzoneCryptoInputStream for key: {} part: {} read {} bytes " +
+              "instead of {} bytes to account for Crypto buffer boundary. " +
+              "Client buffer will be copied with read data from position {}" +
+              "upto position {}, discarding the extra bytes read to " +
+              "maintain Crypto buffer boundary limits", keyName, partIndex,
+          actualNumBytesRead, numBytesRead, readPositionAdjustedBy,
+          actualNumBytesRead - readPositionAdjustedBy);
+
+      if (readLengthAdjustedBy > 0) {
+        seek(getPos() - readLengthAdjustedBy);
+      }
+
+      // Reset readPositionAdjustedBy and readLengthAdjustedBy
+      readPositionAdjustedBy = 0;
+      readLengthAdjustedBy = 0;
+    } else {
+      numBytesRead = super.read(b, off, numBytesToRead);
+      if (numBytesRead != numBytesToRead) {
+        throw new IOException(String.format("Inconsistent read for key=%s " +
+                "part=%d length=%d numBytesToRead=%d numBytesRead=%d",
+            keyName, partIndex, getLength(), numBytesToRead,
+            numBytesRead));
+      }
+    }
+    return numBytesRead;
+  }
+
+  /**
+   * Get number of bytes to read from the current stream based on the length
+   * to be read, number of bytes remaining in the stream and the Crypto buffer
+   * size.
+   * Reads should be performed at the CryptoInputStream Buffer boundaries only.
+   * Otherwise, the decryption will be incorrect.
+   */
+  private int getNumBytesToRead(int lenToRead, int remaining,
+                                int cryptoBufferSize) throws IOException {
+
+    Preconditions.checkArgument(readPositionAdjustedBy == 0);
+    Preconditions.checkArgument(readLengthAdjustedBy == 0);
+
+    // Check and adjust position if required
+    adjustReadPosition(cryptoBufferSize);
+    remaining += readPositionAdjustedBy;
+    lenToRead += readPositionAdjustedBy;
+
+    return adjustNumBytesToRead(lenToRead, remaining, cryptoBufferSize);
+  }
+
+  /**
+   * Reads should be performed at the CryptoInputStream Buffer boundary size.
+   * Otherwise, the decryption will be incorrect. Hence, if the position is
+   * not at the boundary limit, we have to adjust the position and might need
+   * to read more data than requested. The extra data will be filtered out
+   * before returning to the client.
+   */
+  private void adjustReadPosition(long cryptoBufferSize) throws IOException {
+    // Position of the buffer in current stream
+    long currentPosOfStream = getPos();
+    int modulus = (int) (currentPosOfStream % cryptoBufferSize);
+    if (modulus != 0) {
+      // Adjustment required.
+      // Update readPositionAdjustedBy and seek to the adjusted position
+      readPositionAdjustedBy = modulus;
+      // Seek current partStream to adjusted position. We do not need to
+      // reset the seeked positions of other streams.
+      seek(currentPosOfStream - readPositionAdjustedBy);
+      LOG.debug("OzoneCryptoInputStream for key: {} part: {} adjusted " +
+              "position {} by -{} to account for Crypto buffer boundary",
+          keyName, partIndex, currentPosOfStream, readPositionAdjustedBy);
+    }
+  }
+
+  /**
+   * If the length of data requested does not end at a Crypto Buffer
+   * boundary, the number of bytes to be read must be adjusted accordingly.
+   * The extra data will be filtered out before returning to the client.
+   */
+  private int adjustNumBytesToRead(int lenToRead, int remaining,
+                                   int cryptoBufferSize) {
+    int numBytesToRead = Math.min(cryptoBufferSize, remaining);
+    if (lenToRead < numBytesToRead) {
+      // Adjustment required; Update readLengthAdjustedBy.
+      readLengthAdjustedBy = numBytesToRead - lenToRead;
+      LOG.debug("OzoneCryptoInputStream for key: {} part: {} adjusted length " +
+              "by +{} to account for Crypto buffer boundary",
+          keyName, partIndex, readLengthAdjustedBy);
+    }
+    return numBytesToRead;
+  }
+
 }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 973f98ff3b..f3f87c52a8 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -1865,11 +1865,13 @@ public class RpcClient implements ClientProtocol {
       final KeyProvider.KeyVersion decrypted = getDEK(feInfo);
 
       List<OzoneCryptoInputStream> cryptoInputStreams = new ArrayList<>();
-      for (LengthInputStream lengthInputStream : lengthInputStreams) {
+      for (int i = 0; i < lengthInputStreams.size(); i++) {
+        LengthInputStream lengthInputStream = lengthInputStreams.get(i);
         final OzoneCryptoInputStream ozoneCryptoInputStream =
             new OzoneCryptoInputStream(lengthInputStream,
                 OzoneKMSUtil.getCryptoCodec(conf, feInfo),
-                decrypted.getMaterial(), feInfo.getIV());
+                decrypted.getMaterial(), feInfo.getIV(),
+                keyInfo.getKeyName(), i);
         cryptoInputStreams.add(ozoneCryptoInputStream);
       }
       return new MultipartCryptoKeyInputStream(keyInfo.getKeyName(),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org