You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by pi...@apache.org on 2021/10/06 16:35:52 UTC
[ozone] branch master updated: HDDS-5816 Rearrange code and
refactor some logic into new methods in prep for EC addition. (#2709)
This is an automated email from the ASF dual-hosted git repository.
pifta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new ea81ef8 HDDS-5816 Rearrange code and refactor some logic into new methods in prep for EC addition. (#2709)
ea81ef8 is described below
commit ea81ef820827cdfca188c3881538d65f7359d375
Author: Istvan Fajth <pi...@cloudera.com>
AuthorDate: Wed Oct 6 18:35:29 2021 +0200
HDDS-5816 Rearrange code and refactor some logic into new methods in prep for EC addition. (#2709)
---
.../ozone/client/io/BlockOutputStreamEntry.java | 268 ++++++++++++++-------
.../client/io/BlockOutputStreamEntryPool.java | 109 +++++++--
.../hadoop/ozone/client/io/KeyOutputStream.java | 3 +-
3 files changed, 275 insertions(+), 105 deletions(-)
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 548587c..45e8473 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -36,9 +36,14 @@ import org.apache.hadoop.security.token.Token;
import com.google.common.annotations.VisibleForTesting;
/**
- * Helper class used inside {@link BlockOutputStream}.
+ * A BlockOutputStreamEntry manages the data writes into the DataNodes.
+ * It wraps BlockOutputStreams that are connecting to the DataNodes,
+ * and in the meantime accounts the length of data successfully written.
+ *
+ * The base implementation is handling Ratis-3 writes, with a single stream,
+ * but there can be other implementations that are using a different way.
* */
-public final class BlockOutputStreamEntry extends OutputStream {
+public class BlockOutputStreamEntry extends OutputStream {
private final OzoneClientConfig config;
private OutputStream outputStream;
@@ -55,7 +60,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
private BufferPool bufferPool;
@SuppressWarnings({"parameternumber", "squid:S00107"})
- private BlockOutputStreamEntry(
+ BlockOutputStreamEntry(
BlockID blockID, String key,
XceiverClientFactory xceiverClientManager,
Pipeline pipeline,
@@ -76,74 +81,93 @@ public final class BlockOutputStreamEntry extends OutputStream {
this.bufferPool = bufferPool;
}
- long getLength() {
- return length;
- }
-
- Token<OzoneBlockTokenIdentifier> getToken() {
- return token;
- }
-
- long getRemaining() {
- return length - currentPosition;
- }
-
/**
* BlockOutputStream is initialized in this function. This makes sure that
* xceiverClient initialization is not done during preallocation and only
* done when data is written.
* @throws IOException if xceiverClient initialization fails
*/
- private void checkStream() throws IOException {
- if (this.outputStream == null) {
- this.outputStream =
- new RatisBlockOutputStream(blockID, xceiverClientManager,
- pipeline, bufferPool, config, token);
+ void checkStream() throws IOException {
+ if (!isInitialized()) {
+ createOutputStream();
}
}
+ /**
+ * Creates the outputStreams that are necessary to start the write.
+ * Implementors can override this to instantiate multiple streams instead.
+ * @throws IOException
+ */
+ void createOutputStream() throws IOException {
+ outputStream = new RatisBlockOutputStream(blockID, xceiverClientManager,
+ pipeline, bufferPool, config, token);
+ }
@Override
public void write(int b) throws IOException {
checkStream();
- outputStream.write(b);
- this.currentPosition += 1;
+ getOutputStream().write(b);
+ incCurrentPosition();
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
checkStream();
- outputStream.write(b, off, len);
- this.currentPosition += len;
+ getOutputStream().write(b, off, len);
+ incCurrentPosition(len);
+ }
+
+ void writeOnRetry(long len) throws IOException {
+ checkStream();
+ BlockOutputStream out = (BlockOutputStream) getOutputStream();
+ out.writeOnRetry(len);
+ incCurrentPosition(len);
}
@Override
public void flush() throws IOException {
- if (this.outputStream != null) {
- this.outputStream.flush();
+ if (isInitialized()) {
+ getOutputStream().flush();
}
}
@Override
public void close() throws IOException {
- if (this.outputStream != null) {
- this.outputStream.close();
+ if (isInitialized()) {
+ getOutputStream().close();
// after closing the chunkOutPutStream, blockId would have been
// reconstructed with updated bcsId
- this.blockID = ((BlockOutputStream) outputStream).getBlockID();
+ this.blockID = ((BlockOutputStream) getOutputStream()).getBlockID();
}
}
boolean isClosed() {
- if (outputStream != null) {
- return ((BlockOutputStream) outputStream).isClosed();
+ if (isInitialized()) {
+ return ((BlockOutputStream) getOutputStream()).isClosed();
}
return false;
}
+ void cleanup(boolean invalidateClient) throws IOException {
+ checkStream();
+ BlockOutputStream out = (BlockOutputStream) getOutputStream();
+ out.cleanup(invalidateClient);
+ }
+
+ /**
+ * If the underlying BlockOutputStream implements acknowledgement of the
+ * writes, this method returns the total number of bytes acknowledged to be
+ * stored by the DataNode peers.
+ * The default stream implementation returns zero, and if the used stream
+ * does not implement acknowledgement, this method returns zero.
+ *
+ * @return the number of bytes confirmed to by acknowledge by the underlying
+ * BlockOutputStream, or zero if acknowledgment logic is not implemented,
+ * or the entry is not initialized.
+ */
long getTotalAckDataLength() {
- if (outputStream != null) {
- BlockOutputStream out = (BlockOutputStream) this.outputStream;
+ if (isInitialized()) {
+ BlockOutputStream out = (BlockOutputStream) getOutputStream();
blockID = out.getBlockID();
return out.getTotalAckDataLength();
} else {
@@ -154,17 +178,13 @@ public final class BlockOutputStreamEntry extends OutputStream {
}
}
- Collection<DatanodeDetails> getFailedServers() {
- if (outputStream != null) {
- BlockOutputStream out = (BlockOutputStream) this.outputStream;
- return out.getFailedServers();
- }
- return Collections.emptyList();
- }
-
+ /**
+ * Returns the amount of bytes that were attempted to be sent through towards
+ * the DataNodes, and the write call succeeded without an exception.
+ */
long getWrittenDataLength() {
- if (outputStream != null) {
- BlockOutputStream out = (BlockOutputStream) this.outputStream;
+ if (isInitialized()) {
+ BlockOutputStream out = (BlockOutputStream) getOutputStream();
return out.getWrittenDataLength();
} else {
// For a pre allocated block for which no write has been initiated,
@@ -174,19 +194,127 @@ public final class BlockOutputStreamEntry extends OutputStream {
}
}
- void cleanup(boolean invalidateClient) throws IOException {
- checkStream();
- BlockOutputStream out = (BlockOutputStream) this.outputStream;
- out.cleanup(invalidateClient);
+ Collection<DatanodeDetails> getFailedServers() {
+ if (isInitialized()) {
+ BlockOutputStream out = (BlockOutputStream) getOutputStream();
+ return out.getFailedServers();
+ }
+ return Collections.emptyList();
+ }
+ /**
+ * Used to decide if the wrapped output stream is created already or not.
+ * @return true if the wrapped stream is already initialized.
+ */
+ boolean isInitialized() {
+ return getOutputStream() != null;
}
- void writeOnRetry(long len) throws IOException {
- checkStream();
- BlockOutputStream out = (BlockOutputStream) this.outputStream;
- out.writeOnRetry(len);
- this.currentPosition += len;
+ /**
+ * Gets the intended length of the key to be written.
+ * @return the length to be written into the key.
+ */
+ //TODO: this does not belong to here...
+ long getLength() {
+ return this.length;
+ }
+
+ /**
+ * Gets the block token that is used to authenticate during the write.
+ * @return the block token for writing the data
+ */
+ Token<OzoneBlockTokenIdentifier> getToken() {
+ return this.token;
+ }
+
+ /**
+ * Gets the amount of bytes remaining from the full write.
+ * @return the amount of bytes to still be written to the key
+ */
+ //TODO: this does not belong to here...
+ long getRemaining() {
+ return getLength() - getCurrentPosition();
+ }
+ /**
+ * Increases current position by the given length. Used in writes.
+ *
+ * @param len the amount of bytes to increase position with.
+ */
+ void incCurrentPosition(long len) {
+ currentPosition += len;
+ }
+
+ /**
+ * Increases current position by one. Used in writes.
+ */
+ void incCurrentPosition(){
+ currentPosition++;
+ }
+
+ /**
+ * In case of a failure this method can be used to reset the position back to
+ * the last position acked by a node before a write failure.
+ */
+ void resetToAckedPosition() {
+ currentPosition = getTotalAckDataLength();
+ }
+
+ @VisibleForTesting
+ public OutputStream getOutputStream() {
+ return this.outputStream;
+ }
+
+ @VisibleForTesting
+ public BlockID getBlockID() {
+ return this.blockID;
+ }
+
+ /**
+ * During writes a block ID might change as BCSID's are increasing.
+ * Implementors might account these changes, and return a different block id
+ * here.
+ * @param id the last know ID of the block.
+ */
+ void updateBlockID(BlockID id) {
+ this.blockID = id;
+ }
+
+ OzoneClientConfig getConf(){
+ return this.config;
+ }
+
+ XceiverClientFactory getXceiverClientManager() {
+ return this.xceiverClientManager;
+ }
+
+ /**
+ * Gets the original Pipeline this entry is initialized with.
+ * @return the original pipeline
+ */
+ @VisibleForTesting
+ public Pipeline getPipeline() {
+ return this.pipeline;
+ }
+
+ /**
+ * Gets the Pipeline based on which the location report can be sent to the OM.
+ * This is necessary, as implementors might use special pipeline information
+ * that can be created during commit, but not during initialization,
+ * and might need to update some Pipeline information returned in
+ * OMKeyLocationInfo.
+ * @return
+ */
+ Pipeline getPipelineForOMLocationReport(){
+ return getPipeline();
+ }
+
+ long getCurrentPosition() {
+ return this.currentPosition;
+ }
+
+ BufferPool getBufferPool() {
+ return this.bufferPool;
}
/**
@@ -214,8 +342,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
}
public Builder setXceiverClientManager(
- XceiverClientFactory
- xClientManager) {
+ XceiverClientFactory xClientManager) {
this.xceiverClientManager = xClientManager;
return this;
}
@@ -257,39 +384,6 @@ public final class BlockOutputStreamEntry extends OutputStream {
token, config);
}
}
-
- @VisibleForTesting
- public OutputStream getOutputStream() {
- return outputStream;
- }
-
- public BlockID getBlockID() {
- return blockID;
- }
-
- public String getKey() {
- return key;
- }
-
- public XceiverClientFactory getXceiverClientManager() {
- return xceiverClientManager;
- }
-
- public Pipeline getPipeline() {
- return pipeline;
- }
-
- public long getCurrentPosition() {
- return currentPosition;
- }
-
- public BufferPool getBufferPool() {
- return bufferPool;
- }
-
- public void setCurrentPosition(long curPosition) {
- this.currentPosition = curPosition;
- }
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index e9147a8..38f0aa8 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -44,21 +44,37 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * This class manages the stream entries list and handles block allocation
- * from OzoneManager.
+ * A BlockOutputStreamEntryPool manages the communication with OM during writing
+ * a Key to Ozone with {@link KeyOutputStream}.
+ * Block allocation, handling of pre-allocated blocks, and managing stream
+ * entries that represent a writing channel towards DataNodes are the main
+ * responsibility of this class.
*/
public class BlockOutputStreamEntryPool {
public static final Logger LOG =
LoggerFactory.getLogger(BlockOutputStreamEntryPool.class);
+ /**
+ * List of stream entries that are used to write a block of data.
+ */
private final List<BlockOutputStreamEntry> streamEntries;
private final OzoneClientConfig config;
+ /**
+ * The actual stream entry we are writing into. Note that a stream entry is
+ * allowed to manage more streams, as for example in the EC write case, where
+ * an entry represents an EC block group.
+ */
private int currentStreamIndex;
private final OzoneManagerProtocol omClient;
private final OmKeyArgs keyArgs;
private final XceiverClientFactory xceiverClientFactory;
private final String requestID;
+ /**
+ * A {@link BufferPool} shared between all
+ * {@link org.apache.hadoop.hdds.scm.storage.BlockOutputStream}s managed by
+ * the entries in the pool.
+ */
private final BufferPool bufferPool;
private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
private final long openID;
@@ -145,9 +161,17 @@ public class BlockOutputStreamEntryPool {
}
}
- private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
- Preconditions.checkNotNull(subKeyInfo.getPipeline());
- BlockOutputStreamEntry.Builder builder =
+ /**
+ * Method to create a stream entry instance based on the
+ * {@link OmKeyLocationInfo}.
+ * If implementations require additional data to create the entry, they need
+ * to get that data before starting to create entries.
+ * @param subKeyInfo the {@link OmKeyLocationInfo} object that describes the
+ * key to be written.
+ * @return a BlockOutputStreamEntry instance that handles how data is written.
+ */
+ BlockOutputStreamEntry createStreamEntry(OmKeyLocationInfo subKeyInfo) {
+ return
new BlockOutputStreamEntry.Builder()
.setBlockID(subKeyInfo.getBlockID())
.setKey(keyArgs.getKeyName())
@@ -156,22 +180,45 @@ public class BlockOutputStreamEntryPool {
.setConfig(config)
.setLength(subKeyInfo.getLength())
.setBufferPool(bufferPool)
- .setToken(subKeyInfo.getToken());
- streamEntries.add(builder.build());
+ .setToken(subKeyInfo.getToken())
+ .build();
+ }
+
+ private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo) {
+ Preconditions.checkNotNull(subKeyInfo.getPipeline());
+ streamEntries.add(createStreamEntry(subKeyInfo));
}
- public List<OmKeyLocationInfo> getLocationInfoList() {
+ /**
+ * Returns the list of {@link OmKeyLocationInfo} object that describes to OM
+ * where the blocks of the key have been written.
+ * @return the location info list of written blocks.
+ */
+ @VisibleForTesting
+ public List<OmKeyLocationInfo> getLocationInfoList() {
+ List<OmKeyLocationInfo> locationInfoList;
+ List<OmKeyLocationInfo> currBlocksLocationInfoList =
+ getOmKeyLocationInfos(streamEntries);
+ locationInfoList = currBlocksLocationInfoList;
+ return locationInfoList;
+ }
+
+ private List<OmKeyLocationInfo> getOmKeyLocationInfos(
+ List<BlockOutputStreamEntry> streams) {
List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
- for (BlockOutputStreamEntry streamEntry : streamEntries) {
+ for (BlockOutputStreamEntry streamEntry : streams) {
long length = streamEntry.getCurrentPosition();
// Commit only those blocks to OzoneManager which are not empty
if (length != 0) {
OmKeyLocationInfo info =
- new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID())
- .setLength(streamEntry.getCurrentPosition()).setOffset(0)
+ new OmKeyLocationInfo.Builder()
+ .setBlockID(streamEntry.getBlockID())
+ .setLength(streamEntry.getCurrentPosition())
+ .setOffset(0)
.setToken(streamEntry.getToken())
- .setPipeline(streamEntry.getPipeline()).build();
+ .setPipeline(streamEntry.getPipelineForOMLocationReport())
+ .build();
locationInfoList.add(info);
}
if (LOG.isDebugEnabled()) {
@@ -185,6 +232,19 @@ public class BlockOutputStreamEntryPool {
}
/**
+ * Retrieves the {@link BufferPool} instance shared between managed block
+ * output stream entries.
+ * @return the shared buffer pool.
+ */
+ public BufferPool getBufferPool() {
+ return this.bufferPool;
+ }
+
+ OzoneClientConfig getConfig() {
+ return config;
+ }
+
+ /**
* Discards the subsequent pre allocated blocks and removes the streamEntries
* from the streamEntries list for the container which is closed.
* @param containerID id of the closed container
@@ -211,10 +271,12 @@ public class BlockOutputStreamEntryPool {
}
}
+ @VisibleForTesting
List<BlockOutputStreamEntry> getStreamEntries() {
return streamEntries;
}
+ @VisibleForTesting
XceiverClientFactory getXceiverClientFactory() {
return xceiverClientFactory;
}
@@ -224,8 +286,8 @@ public class BlockOutputStreamEntryPool {
}
long getKeyLength() {
- return streamEntries.stream().mapToLong(
- BlockOutputStreamEntry::getCurrentPosition).sum();
+ return streamEntries.stream()
+ .mapToLong(BlockOutputStreamEntry::getCurrentPosition).sum();
}
/**
* Contact OM to get a new block. Set the new block with the index (e.g.
@@ -244,7 +306,16 @@ public class BlockOutputStreamEntryPool {
addKeyLocationInfo(subKeyInfo);
}
-
+ /**
+ * Commits the keys with Ozone Manager(s).
+ * At the end of the write committing the key from client side lets the OM
+ * know that the data has been written and to where. With this info OM can
+ * register the metadata stored in OM and SCM about the key that was written.
+ * @param offset the offset on which the key writer stands at the time of
+ * finishing data writes. (Has to be equal and checked against
+ * the actual length written by the stream entries.)
+ * @throws IOException in case there is an I/O problem during communication.
+ */
void commitKey(long offset) throws IOException {
if (keyArgs != null) {
// in test, this could be null
@@ -266,7 +337,7 @@ public class BlockOutputStreamEntryPool {
}
}
- public BlockOutputStreamEntry getCurrentStreamEntry() {
+ BlockOutputStreamEntry getCurrentStreamEntry() {
if (streamEntries.isEmpty() || streamEntries.size() <= currentStreamIndex) {
return null;
} else {
@@ -274,6 +345,12 @@ public class BlockOutputStreamEntryPool {
}
}
+ /**
+ * Allocates a new block with OM if the current stream is closed, and new
+ * writes are to be handled.
+ * @return the new current open stream to write to
+ * @throws IOException if the block allocation failed.
+ */
BlockOutputStreamEntry allocateBlockIfNeeded() throws IOException {
BlockOutputStreamEntry streamEntry = getCurrentStreamEntry();
if (streamEntry != null && streamEntry.isClosed()) {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 1e13c38..d5f6f5d 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -299,8 +299,7 @@ public class KeyOutputStream extends OutputStream {
Pipeline pipeline = streamEntry.getPipeline();
PipelineID pipelineId = pipeline.getId();
long totalSuccessfulFlushedData = streamEntry.getTotalAckDataLength();
- //set the correct length for the current stream
- streamEntry.setCurrentPosition(totalSuccessfulFlushedData);
+ streamEntry.resetToAckedPosition();
long bufferedDataLen = blockOutputStreamEntryPool.computeBufferData();
if (containerExclusionException) {
LOG.debug(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org