You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2022/10/26 10:16:42 UTC
[ozone] branch master updated: HDDS-7342. Move encryption-related code from MultipartCryptoKeyInputStream to OzoneCryptoInputStream (#3852)
This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new a664ccac6a HDDS-7342. Move encryption-related code from MultipartCryptoKeyInputStream to OzoneCryptoInputStream (#3852)
a664ccac6a is described below
commit a664ccac6a52bd01619ebb29a76f8c171a71f1cf
Author: Cyrill <cy...@gmail.com>
AuthorDate: Wed Oct 26 13:16:35 2022 +0300
HDDS-7342. Move encryption-related code from MultipartCryptoKeyInputStream to OzoneCryptoInputStream (#3852)
---
.../client/io/MultipartCryptoKeyInputStream.java | 135 +-----------------
.../ozone/client/io/OzoneCryptoInputStream.java | 152 ++++++++++++++++++++-
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 6 +-
3 files changed, 156 insertions(+), 137 deletions(-)
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/MultipartCryptoKeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/MultipartCryptoKeyInputStream.java
index 530084e0f3..c7fc21cbb2 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/MultipartCryptoKeyInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/MultipartCryptoKeyInputStream.java
@@ -65,17 +65,6 @@ public class MultipartCryptoKeyInputStream extends OzoneInputStream
// can be reset if a new position is seeked.
private int prevPartIndex = 0;
- // If a read's start/ length position doesn't coincide with a Crypto buffer
- // boundary, it will be adjusted as reads should happen only at the buffer
- // boundaries for decryption to happen correctly. In this case, after the
- // data has been read and decrypted, only the requested data should be
- // returned to the client. readPositionAdjustedBy and readLengthAdjustedBy
- // store these adjustment information. Before returning to client, the first
- // readPositionAdjustedBy number of bytes and the last readLengthAdjustedBy
- // number of bytes must be discarded.
- private int readPositionAdjustedBy = 0;
- private int readLengthAdjustedBy = 0;
-
public MultipartCryptoKeyInputStream(String keyName,
List<OzoneCryptoInputStream> inputStreams) {
@@ -130,65 +119,7 @@ public class MultipartCryptoKeyInputStream extends OzoneInputStream
// Get the current partStream and read data from it
OzoneCryptoInputStream current = partStreams.get(partIndex);
- // CryptoInputStream reads hadoop.security.crypto.buffer.size number of
- // bytes (default 8KB) at a time. This needs to be taken into account
- // in calculating the numBytesToRead.
- int numBytesToRead = getNumBytesToRead(len, (int)current.getRemaining(),
- current.getBufferSize());
- int numBytesRead;
-
- if (readPositionAdjustedBy != 0 || readLengthAdjustedBy != 0) {
- // There was some adjustment made in position and/ or length of data
- // to be read to account for Crypto buffer boundary. Hence, read the
- // data into a temp buffer and then copy only the requested data into
- // clients buffer.
- byte[] tempBuffer = new byte[numBytesToRead];
- int actualNumBytesRead = current.read(tempBuffer, 0,
- numBytesToRead);
- numBytesRead = actualNumBytesRead - readPositionAdjustedBy -
- readLengthAdjustedBy;
-
- if (actualNumBytesRead != numBytesToRead) {
- throw new IOException(String.format("Inconsistent read for key=%s " +
- "part=%d length=%d numBytesToRead(accounting for Crypto " +
- "boundaries)=%d numBytesRead(actual)=%d " +
- "numBytesToBeRead(into client buffer discarding crypto " +
- "boundary adjustments)=%d",
- key, partIndex, current.getLength(), numBytesToRead,
- actualNumBytesRead, numBytesRead));
- }
-
- // TODO: Byte array copies are not optimal. If there is a better and
- // more optimal solution to copy only a part of read data into
- // client buffer, this should be updated.
- System.arraycopy(tempBuffer, readPositionAdjustedBy, b, off,
- numBytesRead);
-
- LOG.debug("OzoneCryptoInputStream for key: {} part: {} read {} bytes " +
- "instead of {} bytes to account for Crypto buffer boundary. " +
- "Client buffer will be copied with read data from position {}" +
- "upto position {}, discarding the extra bytes read to " +
- "maintain Crypto buffer boundary limits", key, partIndex,
- actualNumBytesRead, numBytesRead, readPositionAdjustedBy,
- actualNumBytesRead - readPositionAdjustedBy);
-
- if (readLengthAdjustedBy > 0) {
- current.seek(current.getPos() - readLengthAdjustedBy);
- }
-
- // Reset readPositionAdjustedBy and readLengthAdjustedBy
- readPositionAdjustedBy = 0;
- readLengthAdjustedBy = 0;
- } else {
- numBytesRead = current.read(b, off, numBytesToRead);
- if (numBytesRead != numBytesToRead) {
- throw new IOException(String.format("Inconsistent read for key=%s " +
- "part=%d length=%d numBytesToRead=%d numBytesRead=%d",
- key, partIndex, current.getLength(), numBytesToRead,
- numBytesRead));
- }
- }
-
+ int numBytesRead = current.read(b, off, len);
totalReadLen += numBytesRead;
off += numBytesRead;
len -= numBytesRead;
@@ -202,70 +133,6 @@ public class MultipartCryptoKeyInputStream extends OzoneInputStream
return totalReadLen;
}
- /**
- * Get number of bytes to read from the current stream based on the length
- * to be read, number of bytes remaining in the stream and the Crypto buffer
- * size.
- * Reads should be performed at the CryptoInputStream Buffer boundaries only.
- * Otherwise, the decryption will be incorrect.
- */
- private int getNumBytesToRead(int lenToRead, int remaining,
- int cryptoBufferSize) throws IOException {
-
- Preconditions.checkArgument(readPositionAdjustedBy == 0);
- Preconditions.checkArgument(readLengthAdjustedBy == 0);
-
- // Check and adjust position if required
- adjustReadPosition(cryptoBufferSize);
- remaining += readPositionAdjustedBy;
- lenToRead += readPositionAdjustedBy;
-
- return adjustNumBytesToRead(lenToRead, remaining, cryptoBufferSize);
- }
-
- /**
- * Reads should be performed at the CryptoInputStream Buffer boundary size.
- * Otherwise, the decryption will be incorrect. Hence, if the position is
- * not at the boundary limit, we have to adjust the position and might need
- * to read more data than requested. The extra data will be filtered out
- * before returning to the client.
- */
- private void adjustReadPosition(long cryptoBufferSize) throws IOException {
- // Position of the buffer in current stream
- long currentPosOfStream = partStreams.get(partIndex).getPos();
- int modulus = (int) (currentPosOfStream % cryptoBufferSize);
- if (modulus != 0) {
- // Adjustment required.
- // Update readPositionAdjustedBy and seek to the adjusted position
- readPositionAdjustedBy = modulus;
- // Seek current partStream to adjusted position. We do not need to
- // reset the seeked positions of other streams.
- partStreams.get(partIndex)
- .seek(currentPosOfStream - readPositionAdjustedBy);
- LOG.debug("OzoneCryptoInputStream for key: {} part: {} adjusted " +
- "position {} by -{} to account for Crypto buffer boundary",
- key, partIndex, currentPosOfStream, readPositionAdjustedBy);
- }
- }
-
- /**
- * If the length of data requested does not end at a Crypto Buffer
- * boundary, the number of bytes to be read must be adjusted accordingly.
- * The extra data will be filtered out before returning to the client.
- */
- private int adjustNumBytesToRead(int lenToRead, int remaining,
- int cryptoBufferSize) {
- int numBytesToRead = Math.min(cryptoBufferSize, remaining);
- if (lenToRead < numBytesToRead) {
- // Adjustment required; Update readLengthAdjustedBy.
- readLengthAdjustedBy = numBytesToRead - lenToRead;
- LOG.debug("OzoneCryptoInputStream for key: {} part: {} adjusted length " +
- "by +{} to account for Crypto buffer boundary",
- key, partIndex, readLengthAdjustedBy);
- }
- return numBytesToRead;
- }
-
/**
* Seeks the InputStream to the specified position. This involves 2 steps:
* 1. Updating the partIndex to the partStream corresponding to the
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneCryptoInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneCryptoInputStream.java
index 9d5d888688..1d0cb2bb82 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneCryptoInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneCryptoInputStream.java
@@ -19,10 +19,14 @@
package org.apache.hadoop.ozone.client.io;
import java.io.IOException;
+
+import com.google.common.base.Preconditions;
import org.apache.hadoop.crypto.CryptoCodec;
import org.apache.hadoop.crypto.CryptoInputStream;
import org.apache.hadoop.crypto.CryptoStreamUtils;
import org.apache.hadoop.fs.Seekable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A CryptoInputStream for Ozone with length. This stream is used to read
@@ -31,16 +35,35 @@ import org.apache.hadoop.fs.Seekable;
public class OzoneCryptoInputStream extends CryptoInputStream
implements Seekable {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(OzoneCryptoInputStream.class);
+
private final long length;
private final int bufferSize;
+ private final String keyName;
+ private final int partIndex;
+
+ // If a read's start/ length position doesn't coincide with a Crypto buffer
+ // boundary, it will be adjusted as reads should happen only at the buffer
+ // boundaries for decryption to happen correctly. In this case, after the
+ // data has been read and decrypted, only the requested data should be
+ // returned to the client. readPositionAdjustedBy and readLengthAdjustedBy
+ // store these adjustment information. Before returning to client, the first
+ // readPositionAdjustedBy number of bytes and the last readLengthAdjustedBy
+ // number of bytes must be discarded.
+ private int readPositionAdjustedBy = 0;
+ private int readLengthAdjustedBy = 0;
public OzoneCryptoInputStream(LengthInputStream in,
- CryptoCodec codec, byte[] key, byte[] iv) throws IOException {
+ CryptoCodec codec, byte[] key, byte[] iv,
+ String keyName, int partIndex) throws IOException {
super(in.getWrappedStream(), codec, key, iv);
this.length = in.getLength();
// This is the buffer size used while creating the CryptoInputStream
// internally
this.bufferSize = CryptoStreamUtils.getBufferSize(codec.getConf());
+ this.keyName = keyName;
+ this.partIndex = partIndex;
}
public long getLength() {
@@ -54,4 +77,131 @@ public class OzoneCryptoInputStream extends CryptoInputStream
public long getRemaining() throws IOException {
return length - getPos();
}
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ // CryptoInputStream reads hadoop.security.crypto.buffer.size number of
+ // bytes (default 8KB) at a time. This needs to be taken into account
+ // in calculating the numBytesToRead.
+ int numBytesToRead = getNumBytesToRead(len, (int)getRemaining(),
+ getBufferSize());
+ int numBytesRead;
+
+ if (readPositionAdjustedBy != 0 || readLengthAdjustedBy != 0) {
+ // There was some adjustment made in position and/ or length of data
+ // to be read to account for Crypto buffer boundary. Hence, read the
+ // data into a temp buffer and then copy only the requested data into
+ // clients buffer.
+ byte[] tempBuffer = new byte[numBytesToRead];
+ int actualNumBytesRead = super.read(tempBuffer, 0,
+ numBytesToRead);
+ numBytesRead = actualNumBytesRead - readPositionAdjustedBy -
+ readLengthAdjustedBy;
+
+ if (actualNumBytesRead != numBytesToRead) {
+ throw new IOException(String.format("Inconsistent read for key=%s " +
+ "part=%d length=%d numBytesToRead(accounting for Crypto " +
+ "boundaries)=%d numBytesRead(actual)=%d " +
+ "numBytesToBeRead(into client buffer discarding crypto " +
+ "boundary adjustments)=%d",
+ keyName, partIndex, getLength(), numBytesToRead,
+ actualNumBytesRead, numBytesRead));
+ }
+
+ // TODO: Byte array copies are not optimal. If there is a better and
+ // more optimal solution to copy only a part of read data into
+ // client buffer, this should be updated.
+ System.arraycopy(tempBuffer, readPositionAdjustedBy, b, off,
+ numBytesRead);
+
+ LOG.debug("OzoneCryptoInputStream for key: {} part: {} read {} bytes " +
+ "instead of {} bytes to account for Crypto buffer boundary. " +
+ "Client buffer will be copied with read data from position {}" +
+ "upto position {}, discarding the extra bytes read to " +
+ "maintain Crypto buffer boundary limits", keyName, partIndex,
+ actualNumBytesRead, numBytesRead, readPositionAdjustedBy,
+ actualNumBytesRead - readPositionAdjustedBy);
+
+ if (readLengthAdjustedBy > 0) {
+ seek(getPos() - readLengthAdjustedBy);
+ }
+
+ // Reset readPositionAdjustedBy and readLengthAdjustedBy
+ readPositionAdjustedBy = 0;
+ readLengthAdjustedBy = 0;
+ } else {
+ numBytesRead = super.read(b, off, numBytesToRead);
+ if (numBytesRead != numBytesToRead) {
+ throw new IOException(String.format("Inconsistent read for key=%s " +
+ "part=%d length=%d numBytesToRead=%d numBytesRead=%d",
+ keyName, partIndex, getLength(), numBytesToRead,
+ numBytesRead));
+ }
+ }
+ return numBytesRead;
+ }
+
+ /**
+ * Get number of bytes to read from the current stream based on the length
+ * to be read, number of bytes remaining in the stream and the Crypto buffer
+ * size.
+ * Reads should be performed at the CryptoInputStream Buffer boundaries only.
+ * Otherwise, the decryption will be incorrect.
+ */
+ private int getNumBytesToRead(int lenToRead, int remaining,
+ int cryptoBufferSize) throws IOException {
+
+ Preconditions.checkArgument(readPositionAdjustedBy == 0);
+ Preconditions.checkArgument(readLengthAdjustedBy == 0);
+
+ // Check and adjust position if required
+ adjustReadPosition(cryptoBufferSize);
+ remaining += readPositionAdjustedBy;
+ lenToRead += readPositionAdjustedBy;
+
+ return adjustNumBytesToRead(lenToRead, remaining, cryptoBufferSize);
+ }
+
+ /**
+ * Reads should be performed at the CryptoInputStream Buffer boundary size.
+ * Otherwise, the decryption will be incorrect. Hence, if the position is
+ * not at the boundary limit, we have to adjust the position and might need
+ * to read more data than requested. The extra data will be filtered out
+ * before returning to the client.
+ */
+ private void adjustReadPosition(long cryptoBufferSize) throws IOException {
+ // Position of the buffer in current stream
+ long currentPosOfStream = getPos();
+ int modulus = (int) (currentPosOfStream % cryptoBufferSize);
+ if (modulus != 0) {
+ // Adjustment required.
+ // Update readPositionAdjustedBy and seek to the adjusted position
+ readPositionAdjustedBy = modulus;
+ // Seek current partStream to adjusted position. We do not need to
+ // reset the seeked positions of other streams.
+ seek(currentPosOfStream - readPositionAdjustedBy);
+ LOG.debug("OzoneCryptoInputStream for key: {} part: {} adjusted " +
+ "position {} by -{} to account for Crypto buffer boundary",
+ keyName, partIndex, currentPosOfStream, readPositionAdjustedBy);
+ }
+ }
+
+ /**
+ * If the length of data requested does not end at a Crypto Buffer
+ * boundary, the number of bytes to be read must be adjusted accordingly.
+ * The extra data will be filtered out before returning to the client.
+ */
+ private int adjustNumBytesToRead(int lenToRead, int remaining,
+ int cryptoBufferSize) {
+ int numBytesToRead = Math.min(cryptoBufferSize, remaining);
+ if (lenToRead < numBytesToRead) {
+ // Adjustment required; Update readLengthAdjustedBy.
+ readLengthAdjustedBy = numBytesToRead - lenToRead;
+ LOG.debug("OzoneCryptoInputStream for key: {} part: {} adjusted length " +
+ "by +{} to account for Crypto buffer boundary",
+ keyName, partIndex, readLengthAdjustedBy);
+ }
+ return numBytesToRead;
+ }
+
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 973f98ff3b..f3f87c52a8 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -1865,11 +1865,13 @@ public class RpcClient implements ClientProtocol {
final KeyProvider.KeyVersion decrypted = getDEK(feInfo);
List<OzoneCryptoInputStream> cryptoInputStreams = new ArrayList<>();
- for (LengthInputStream lengthInputStream : lengthInputStreams) {
+ for (int i = 0; i < lengthInputStreams.size(); i++) {
+ LengthInputStream lengthInputStream = lengthInputStreams.get(i);
final OzoneCryptoInputStream ozoneCryptoInputStream =
new OzoneCryptoInputStream(lengthInputStream,
OzoneKMSUtil.getCryptoCodec(conf, feInfo),
- decrypted.getMaterial(), feInfo.getIV());
+ decrypted.getMaterial(), feInfo.getIV(),
+ keyInfo.getKeyName(), i);
cryptoInputStreams.add(ozoneCryptoInputStream);
}
return new MultipartCryptoKeyInputStream(keyInfo.getKeyName(),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org