You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/03/11 17:46:47 UTC
[hadoop] branch trunk updated: HDDS-1173. Fix a data corruption bug
in BlockOutputStream. Contributed by Shashikant Banerjee.
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new b4aa24d HDDS-1173. Fix a data corruption bug in BlockOutputStream. Contributed by Shashikant Banerjee.
b4aa24d is described below
commit b4aa24d3c5ad1b9309a58795e4b48e567695c4e4
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Mon Mar 11 23:15:49 2019 +0530
HDDS-1173. Fix a data corruption bug in BlockOutputStream. Contributed by Shashikant Banerjee.
---
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 232 +++++++++------------
.../apache/hadoop/hdds/scm/storage/BufferPool.java | 106 ++++++++++
.../ozone/client/io/BlockOutputStreamEntry.java | 23 +-
.../hadoop/ozone/client/io/KeyOutputStream.java | 21 +-
.../rpc/TestCloseContainerHandlingByClient.java | 62 ++++--
.../commandhandler/TestBlockDeletion.java | 2 +-
6 files changed, 279 insertions(+), 167 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 2e156b3..fe41f57 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.UUID;
@@ -87,7 +86,7 @@ public class BlockOutputStream extends OutputStream {
private final long streamBufferFlushSize;
private final long streamBufferMaxSize;
private final long watchTimeout;
- private List<ByteBuffer> bufferList;
+ private BufferPool bufferPool;
// The IOException will be set by response handling thread in case there is an
// exception received in the response. If the exception is set, the next
// request will fail upfront.
@@ -111,8 +110,6 @@ public class BlockOutputStream extends OutputStream {
// map containing mapping for putBlock logIndex to to flushedDataLength Map.
private ConcurrentHashMap<Long, Long> commitIndex2flushedDataMap;
- private int currentBufferIndex;
-
private List<DatanodeDetails> failedServers;
/**
@@ -124,7 +121,7 @@ public class BlockOutputStream extends OutputStream {
* @param pipeline pipeline where block will be written
* @param traceID container protocol call args
* @param chunkSize chunk size
- * @param bufferList list of byte buffers
+ * @param bufferPool pool of buffers
* @param streamBufferFlushSize flush size
* @param streamBufferMaxSize max size of the currentBuffer
* @param watchTimeout watch timeout
@@ -135,7 +132,7 @@ public class BlockOutputStream extends OutputStream {
public BlockOutputStream(BlockID blockID, String key,
XceiverClientManager xceiverClientManager, Pipeline pipeline,
String traceID, int chunkSize, long streamBufferFlushSize,
- long streamBufferMaxSize, long watchTimeout, List<ByteBuffer> bufferList,
+ long streamBufferMaxSize, long watchTimeout, BufferPool bufferPool,
ChecksumType checksumType, int bytesPerChecksum)
throws IOException {
this.blockID = blockID;
@@ -154,7 +151,7 @@ public class BlockOutputStream extends OutputStream {
this.streamBufferFlushSize = streamBufferFlushSize;
this.streamBufferMaxSize = streamBufferMaxSize;
this.watchTimeout = watchTimeout;
- this.bufferList = bufferList;
+ this.bufferPool = bufferPool;
this.checksumType = checksumType;
this.bytesPerChecksum = bytesPerChecksum;
@@ -164,7 +161,6 @@ public class BlockOutputStream extends OutputStream {
totalAckDataLength = 0;
futureMap = new ConcurrentHashMap<>();
totalDataFlushedLength = 0;
- currentBufferIndex = 0;
writtenDataLength = 0;
failedServers = Collections.emptyList();
}
@@ -181,13 +177,6 @@ public class BlockOutputStream extends OutputStream {
return writtenDataLength;
}
- private long computeBufferData() {
- int dataLength =
- bufferList.stream().mapToInt(Buffer::position).sum();
- Preconditions.checkState(dataLength <= streamBufferMaxSize);
- return dataLength;
- }
-
public List<DatanodeDetails> getFailedServers() {
return failedServers;
}
@@ -202,6 +191,7 @@ public class BlockOutputStream extends OutputStream {
@Override
public void write(byte[] b, int off, int len) throws IOException {
+ checkOpen();
if (b == null) {
throw new NullPointerException();
}
@@ -213,53 +203,40 @@ public class BlockOutputStream extends OutputStream {
return;
}
while (len > 0) {
- checkOpen();
int writeLen;
- allocateBuffer();
- ByteBuffer currentBuffer = getCurrentBuffer();
+
+ // Allocate a buffer if needed. The buffer will be allocated only
+ // once as needed and will be reused again for mutiple blockOutputStream
+ // entries.
+ ByteBuffer currentBuffer = bufferPool.allocateBufferIfNeeded();
+ int pos = currentBuffer.position();
writeLen =
- Math.min(chunkSize - currentBuffer.position() % chunkSize, len);
+ Math.min(chunkSize - pos % chunkSize, len);
currentBuffer.put(b, off, writeLen);
- if (currentBuffer.position() % chunkSize == 0) {
- int pos = currentBuffer.position() - chunkSize;
- int limit = currentBuffer.position();
- writeChunk(pos, limit);
+ if (!currentBuffer.hasRemaining()) {
+ writeChunk(currentBuffer);
}
off += writeLen;
len -= writeLen;
writtenDataLength += writeLen;
- if (currentBuffer.position() == streamBufferFlushSize) {
+ if (shouldFlush()) {
totalDataFlushedLength += streamBufferFlushSize;
handlePartialFlush();
}
- long bufferedData = computeBufferData();
- // Data in the bufferList can not exceed streamBufferMaxSize
- if (bufferedData == streamBufferMaxSize) {
+ // Data in the bufferPool can not exceed streamBufferMaxSize
+ if (isBufferPoolFull()) {
handleFullBuffer();
}
}
}
- private ByteBuffer getCurrentBuffer() {
- ByteBuffer buffer = bufferList.get(currentBufferIndex);
- if (!buffer.hasRemaining()) {
- currentBufferIndex =
- currentBufferIndex < getMaxNumBuffers() - 1 ? ++currentBufferIndex :
- 0;
- }
- return bufferList.get(currentBufferIndex);
- }
-
- private int getMaxNumBuffers() {
- return (int)(streamBufferMaxSize/streamBufferFlushSize);
+ private boolean shouldFlush() {
+ return writtenDataLength % streamBufferFlushSize == 0;
}
- private void allocateBuffer() {
- for (int i = bufferList.size(); i < getMaxNumBuffers(); i++) {
- bufferList.add(ByteBuffer.allocate((int)streamBufferFlushSize));
- }
+ private boolean isBufferPoolFull() {
+ return bufferPool.computeBufferData() == streamBufferMaxSize;
}
-
/**
* Will be called on the retryPath in case closedContainerException/
* TimeoutException.
@@ -272,36 +249,37 @@ public class BlockOutputStream extends OutputStream {
if (len == 0) {
return;
}
- int off = 0;
- int pos = off;
+ int count = 0;
+ Preconditions.checkArgument(len <= streamBufferMaxSize);
while (len > 0) {
long writeLen;
writeLen = Math.min(chunkSize, len);
if (writeLen == chunkSize) {
- int limit = pos + chunkSize;
- writeChunk(pos, limit);
+ writeChunk(bufferPool.getBuffer(count));
}
- off += writeLen;
len -= writeLen;
+ count++;
writtenDataLength += writeLen;
- if (off % streamBufferFlushSize == 0) {
- // reset the position to zero as now we wll readng thhe next buffer in
- // the list
- pos = 0;
+ if (shouldFlush()) {
+ // reset the position to zero as now we will be reading the
+ // next buffer in the list
totalDataFlushedLength += streamBufferFlushSize;
handlePartialFlush();
}
- if (computeBufferData() % streamBufferMaxSize == 0) {
+
+ // we should not call isBufferFull here. The buffer might already be full
+ // as whole data is already cached in the buffer. We should just validate
+ // if we wrote data of size streamBufferMaxSize to call for handling
+ // full buffer condition.
+ if (writtenDataLength == streamBufferMaxSize) {
handleFullBuffer();
}
}
}
/**
- * just update the totalAckDataLength. Since we have allocated
- * the currentBuffer more than the streamBufferMaxSize, we can keep on writing
- * to the currentBuffer. In case of failure, we will read the data starting
- * from totalAckDataLength.
+ * just update the totalAckDataLength. In case of failure,
+ * we will read the data starting from totalAckDataLength.
*/
private void updateFlushIndex(long index) {
if (!commitIndex2flushedDataMap.isEmpty()) {
@@ -310,13 +288,15 @@ public class BlockOutputStream extends OutputStream {
LOG.debug("Total data successfully replicated: " + totalAckDataLength);
futureMap.remove(totalAckDataLength);
// Flush has been committed to required servers successful.
- // just swap the bufferList head and tail after clearing.
- ByteBuffer currentBuffer = bufferList.remove(0);
- currentBuffer.clear();
- if (currentBufferIndex != 0) {
- currentBufferIndex--;
+ // just release the current buffer from the buffer pool.
+
+ // every entry removed from the putBlock future Map signifies
+ // streamBufferFlushSize/chunkSize no of chunks successfully committed.
+ // Release the buffers from the buffer pool to be reused again.
+ int chunkCount = (int) (streamBufferFlushSize / chunkSize);
+ for (int i = 0; i < chunkCount; i++) {
+ bufferPool.releaseBuffer();
}
- bufferList.add(currentBuffer);
}
}
@@ -450,91 +430,85 @@ public class BlockOutputStream extends OutputStream {
@Override
public void flush() throws IOException {
if (xceiverClientManager != null && xceiverClient != null
- && bufferList != null) {
- checkOpen();
- int bufferSize = bufferList.size();
- if (bufferSize > 0) {
- try {
- // flush the last chunk data residing on the currentBuffer
- if (totalDataFlushedLength < writtenDataLength) {
- ByteBuffer currentBuffer = getCurrentBuffer();
- int pos = currentBuffer.position() - (currentBuffer.position()
- % chunkSize);
- int limit = currentBuffer.position() - pos;
- writeChunk(pos, currentBuffer.position());
- totalDataFlushedLength += limit;
- handlePartialFlush();
- }
- waitOnFlushFutures();
- // just check again if the exception is hit while waiting for the
- // futures to ensure flush has indeed succeeded
- checkOpen();
- } catch (InterruptedException | ExecutionException e) {
- adjustBuffersOnException();
- throw new IOException(
- "Unexpected Storage Container Exception: " + e.toString(), e);
- }
+ && bufferPool != null && bufferPool.getSize() > 0) {
+ try {
+ handleFlush();
+ } catch (InterruptedException | ExecutionException e) {
+ adjustBuffersOnException();
+ throw new IOException(
+ "Unexpected Storage Container Exception: " + e.toString(), e);
}
}
}
- private void writeChunk(int pos, int limit) throws IOException {
+
+ private void writeChunk(ByteBuffer buffer)
+ throws IOException {
// Please note : We are not flipping the slice when we write since
// the slices are pointing the currentBuffer start and end as needed for
// the chunk write. Also please note, Duplicate does not create a
// copy of data, it only creates metadata that points to the data
// stream.
- ByteBuffer chunk = bufferList.get(currentBufferIndex).duplicate();
- chunk.position(pos);
- chunk.limit(limit);
+ ByteBuffer chunk = buffer.duplicate();
+ chunk.position(0);
+ chunk.limit(buffer.position());
writeChunkToContainer(chunk);
}
+ private void handleFlush()
+ throws IOException, InterruptedException, ExecutionException {
+ checkOpen();
+ // flush the last chunk data residing on the currentBuffer
+ if (totalDataFlushedLength < writtenDataLength) {
+ ByteBuffer currentBuffer = bufferPool.getBuffer();
+ int pos = currentBuffer.position();
+ writeChunk(currentBuffer);
+ totalDataFlushedLength += pos;
+ handlePartialFlush();
+ }
+ waitOnFlushFutures();
+ // just check again if the exception is hit while waiting for the
+ // futures to ensure flush has indeed succeeded
+
+ // irrespective of whether the commitIndex2flushedDataMap is empty
+ // or not, ensure there is no exception set
+ checkOpen();
+
+ }
+
@Override
public void close() throws IOException {
if (xceiverClientManager != null && xceiverClient != null
- && bufferList != null) {
- int bufferSize = bufferList.size();
- if (bufferSize > 0) {
- try {
- // flush the last chunk data residing on the currentBuffer
- if (totalDataFlushedLength < writtenDataLength) {
- ByteBuffer currentBuffer = getCurrentBuffer();
- int pos = currentBuffer.position() - (currentBuffer.position()
- % chunkSize);
- int limit = currentBuffer.position() - pos;
- writeChunk(pos, currentBuffer.position());
- totalDataFlushedLength += limit;
- handlePartialFlush();
- }
- waitOnFlushFutures();
- // irrespective of whether the commitIndex2flushedDataMap is empty
- // or not, ensure there is no exception set
- checkOpen();
- if (!commitIndex2flushedDataMap.isEmpty()) {
- // wait for the last commit index in the commitIndex2flushedDataMap
- // to get committed to all or majority of nodes in case timeout
- // happens.
- long lastIndex =
- commitIndex2flushedDataMap.keySet().stream()
- .mapToLong(v -> v).max().getAsLong();
- LOG.debug(
- "waiting for last flush Index " + lastIndex + " to catch up");
- watchForCommit(lastIndex);
- }
- } catch (InterruptedException | ExecutionException e) {
- adjustBuffersOnException();
- throw new IOException(
- "Unexpected Storage Container Exception: " + e.toString(), e);
- } finally {
- cleanup(false);
+ && bufferPool != null && bufferPool.getSize() > 0) {
+ try {
+ handleFlush();
+ if (!commitIndex2flushedDataMap.isEmpty()) {
+ // wait for the last commit index in the commitIndex2flushedDataMap
+ // to get committed to all or majority of nodes in case timeout
+ // happens.
+ long lastIndex =
+ commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v)
+ .max().getAsLong();
+ LOG.debug(
+ "waiting for last flush Index " + lastIndex + " to catch up");
+ watchForCommit(lastIndex);
}
+ } catch (InterruptedException | ExecutionException e) {
+ adjustBuffersOnException();
+ throw new IOException(
+ "Unexpected Storage Container Exception: " + e.toString(), e);
+ } finally {
+ cleanup(false);
}
- // clear the currentBuffer
- bufferList.stream().forEach(ByteBuffer::clear);
+ // TODO: Turn the below buffer empty check on whne Standalone pipeline
+ // is removed in the write path in tests
+ // Preconditions.checkArgument(buffer.position() == 0);
+ // bufferPool.checkBufferPoolEmpty();
+
}
}
+
private void waitOnFlushFutures()
throws InterruptedException, ExecutionException {
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
new file mode 100644
index 0000000..541e6bd
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BufferPool.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.base.Preconditions;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This class creates and manages pool of n buffers.
+ */
+public class BufferPool {
+
+ private List<ByteBuffer> bufferList;
+ private int currentBufferIndex;
+ private final int bufferSize;
+ private final int capacity;
+
+ public BufferPool(int bufferSize, int capacity) {
+ this.capacity = capacity;
+ this.bufferSize = bufferSize;
+ bufferList = new ArrayList<>(capacity);
+ currentBufferIndex = -1;
+ }
+
+ public ByteBuffer getBuffer() {
+ return currentBufferIndex == -1 ? null : bufferList.get(currentBufferIndex);
+ }
+
+ /**
+ * If the currentBufferIndex is less than the buffer size - 1,
+ * it means, the next buffer in the list has been freed up for
+ * rewriting. Reuse the next available buffer in such cases.
+ *
+ * In case, the currentBufferIndex == buffer.size and buffer size is still
+ * less than the capacity to be allocated, just allocate a buffer of size
+ * chunk size.
+ *
+ */
+ public ByteBuffer allocateBufferIfNeeded() {
+ ByteBuffer buffer = getBuffer();
+ if (buffer != null && buffer.hasRemaining()) {
+ return buffer;
+ }
+ if (currentBufferIndex < bufferList.size() - 1) {
+ buffer = getBuffer(currentBufferIndex + 1);
+ } else {
+ buffer = ByteBuffer.allocate(bufferSize);
+ bufferList.add(buffer);
+ }
+ Preconditions.checkArgument(bufferList.size() <= capacity);
+ currentBufferIndex++;
+ // TODO: Turn the below precondition check on when Standalone pipeline
+ // is removed in the write path in tests
+ // Preconditions.checkArgument(buffer.position() == 0);
+ return buffer;
+ }
+
+ public void releaseBuffer() {
+ // always remove from head of the list and append at last
+ ByteBuffer buffer = bufferList.remove(0);
+ buffer.clear();
+ bufferList.add(buffer);
+ currentBufferIndex--;
+ }
+
+ public void clearBufferPool() {
+ bufferList.clear();
+ currentBufferIndex = -1;
+ }
+
+ public void checkBufferPoolEmpty() {
+ Preconditions.checkArgument(computeBufferData() == 0);
+ }
+ public long computeBufferData() {
+ return bufferList.stream().mapToInt(value -> value.position())
+ .sum();
+ }
+
+ public int getSize() {
+ return bufferList.size();
+ }
+
+ ByteBuffer getBuffer(int index) {
+ return bufferList.get(index);
+ }
+
+}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 16a825f..b6de8ab 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -19,8 +19,6 @@ package org.apache.hadoop.ozone.client.io;
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.List;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -29,6 +27,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
+import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -58,14 +57,14 @@ public final class BlockOutputStreamEntry extends OutputStream {
private final long streamBufferFlushSize;
private final long streamBufferMaxSize;
private final long watchTimeout;
- private List<ByteBuffer> bufferList;
+ private BufferPool bufferPool;
@SuppressWarnings("parameternumber")
private BlockOutputStreamEntry(BlockID blockID, String key,
XceiverClientManager xceiverClientManager,
Pipeline pipeline, String requestId, int chunkSize,
long length, long streamBufferFlushSize, long streamBufferMaxSize,
- long watchTimeout, List<ByteBuffer> bufferList,
+ long watchTimeout, BufferPool bufferPool,
ChecksumType checksumType, int bytesPerChecksum,
Token<OzoneBlockTokenIdentifier> token) {
this.outputStream = null;
@@ -81,7 +80,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
this.streamBufferFlushSize = streamBufferFlushSize;
this.streamBufferMaxSize = streamBufferMaxSize;
this.watchTimeout = watchTimeout;
- this.bufferList = bufferList;
+ this.bufferPool = bufferPool;
this.checksumType = checksumType;
this.bytesPerChecksum = bytesPerChecksum;
}
@@ -112,7 +111,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
this.outputStream =
new BlockOutputStream(blockID, key, xceiverClientManager,
pipeline, requestId, chunkSize, streamBufferFlushSize,
- streamBufferMaxSize, watchTimeout, bufferList, checksumType,
+ streamBufferMaxSize, watchTimeout, bufferPool, checksumType,
bytesPerChecksum);
}
}
@@ -212,7 +211,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
private long streamBufferFlushSize;
private long streamBufferMaxSize;
private long watchTimeout;
- private List<ByteBuffer> bufferList;
+ private BufferPool bufferPool;
private Token<OzoneBlockTokenIdentifier> token;
private ChecksumType checksumType;
private int bytesPerChecksum;
@@ -278,8 +277,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
return this;
}
- public Builder setBufferList(List<ByteBuffer> bffrLst) {
- this.bufferList = bffrLst;
+ public Builder setbufferPool(BufferPool pool) {
+ this.bufferPool = pool;
return this;
}
@@ -292,7 +291,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
return new BlockOutputStreamEntry(blockID, key,
xceiverClientManager, pipeline, requestId, chunkSize,
length, streamBufferFlushSize, streamBufferMaxSize, watchTimeout,
- bufferList, checksumType, bytesPerChecksum, token);
+ bufferPool, checksumType, bytesPerChecksum, token);
}
}
@@ -340,8 +339,8 @@ public final class BlockOutputStreamEntry extends OutputStream {
return watchTimeout;
}
- public List<ByteBuffer> getBufferList() {
- return bufferList;
+ public BufferPool getBufferPool() {
+ return bufferPool;
}
public void setCurrentPosition(long curPosition) {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 7035c73..d1acbe1 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -45,7 +46,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Collection;
@@ -83,7 +83,7 @@ public class KeyOutputStream extends OutputStream {
private final long blockSize;
private final int bytesPerChecksum;
private final ChecksumType checksumType;
- private List<ByteBuffer> bufferList;
+ private final BufferPool bufferPool;
private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
private FileEncryptionInfo feInfo;
private ExcludeList excludeList;
@@ -104,9 +104,7 @@ public class KeyOutputStream extends OutputStream {
closed = false;
streamBufferFlushSize = 0;
streamBufferMaxSize = 0;
- bufferList = new ArrayList<>(1);
- ByteBuffer buffer = ByteBuffer.allocate(1);
- bufferList.add(buffer);
+ bufferPool = new BufferPool(chunkSize, 1);
watchTimeout = 0;
blockSize = 0;
this.checksumType = ChecksumType.valueOf(
@@ -182,7 +180,8 @@ public class KeyOutputStream extends OutputStream {
Preconditions.checkState(streamBufferFlushSize % chunkSize == 0);
Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0);
Preconditions.checkState(blockSize % streamBufferMaxSize == 0);
- this.bufferList = new ArrayList<>();
+ this.bufferPool =
+ new BufferPool(chunkSize, (int)streamBufferMaxSize / chunkSize);
this.excludeList = new ExcludeList();
}
@@ -228,7 +227,7 @@ public class KeyOutputStream extends OutputStream {
.setStreamBufferFlushSize(streamBufferFlushSize)
.setStreamBufferMaxSize(streamBufferMaxSize)
.setWatchTimeout(watchTimeout)
- .setBufferList(bufferList)
+ .setbufferPool(bufferPool)
.setChecksumType(checksumType)
.setBytesPerChecksum(bytesPerChecksum)
.setToken(subKeyInfo.getToken());
@@ -272,8 +271,7 @@ public class KeyOutputStream extends OutputStream {
}
private long computeBufferData() {
- return bufferList.stream().mapToInt(value -> value.position())
- .sum();
+ return bufferPool.computeBufferData();
}
private void handleWrite(byte[] b, int off, long len, boolean retry)
@@ -580,10 +578,7 @@ public class KeyOutputStream extends OutputStream {
} catch (IOException ioe) {
throw ioe;
} finally {
- if (bufferList != null) {
- bufferList.stream().forEach(e -> e.clear());
- }
- bufferList = null;
+ bufferPool.clearBufferPool();
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
index bf4f4d4..396351d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
@@ -87,6 +87,7 @@ public class TestCloseContainerHandlingByClient {
conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+ conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
conf.setQuietMode(false);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
StorageUnit.MB);
@@ -133,7 +134,7 @@ public class TestCloseContainerHandlingByClient {
.setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
.build();
- waitForContainerClose(keyName, key);
+ waitForContainerClose(key);
key.write(data);
key.flush();
key.close();
@@ -166,7 +167,7 @@ public class TestCloseContainerHandlingByClient {
.setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
.build();
- waitForContainerClose(keyName, key);
+ waitForContainerClose(key);
key.close();
// read the key from OM again and match the length.The length will still
// be the equal to the original data size.
@@ -180,12 +181,12 @@ public class TestCloseContainerHandlingByClient {
String keyName = getKeyName();
OzoneOutputStream key =
- createKey(keyName, ReplicationType.RATIS, (4 * blockSize));
+ createKey(keyName, ReplicationType.RATIS, (3 * blockSize));
KeyOutputStream keyOutputStream =
(KeyOutputStream) key.getOutputStream();
// With the initial size provided, it should have preallocated 4 blocks
- Assert.assertEquals(4, keyOutputStream.getStreamEntries().size());
- // write data more than 1 chunk
+ Assert.assertEquals(3, keyOutputStream.getStreamEntries().size());
+ // write data more than 1 block
byte[] data =
ContainerTestHelper.getFixedLengthString(keyString, (3 * blockSize))
.getBytes(UTF_8);
@@ -199,7 +200,7 @@ public class TestCloseContainerHandlingByClient {
.setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
.build();
- waitForContainerClose(keyName, key);
+ waitForContainerClose(key);
// write 1 more block worth of data. It will fail and new block will be
// allocated
key.write(ContainerTestHelper.getFixedLengthString(keyString, blockSize)
@@ -258,7 +259,7 @@ public class TestCloseContainerHandlingByClient {
.setFactor(HddsProtos.ReplicationFactor.THREE).setKeyName(keyName)
.build();
- waitForContainerClose(keyName, key);
+ waitForContainerClose(key);
key.close();
// read the key from OM again and match the length.The length will still
@@ -301,7 +302,7 @@ public class TestCloseContainerHandlingByClient {
.setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
.build();
- waitForContainerClose(keyName, key);
+ waitForContainerClose(key);
// write 3 more chunks worth of data. It will fail and new block will be
// allocated. This write completes 4 blocks worth of data written to key
data = Arrays.copyOfRange(writtenData, 3 * blockSize + chunkSize, keyLen);
@@ -330,8 +331,7 @@ public class TestCloseContainerHandlingByClient {
Assert.assertEquals(4 * blockSize, length);
}
- private void waitForContainerClose(String keyName,
- OzoneOutputStream outputStream)
+ private void waitForContainerClose(OzoneOutputStream outputStream)
throws Exception {
KeyOutputStream keyOutputStream =
(KeyOutputStream) outputStream.getOutputStream();
@@ -375,7 +375,7 @@ public class TestCloseContainerHandlingByClient {
.getPipeline(container.getPipelineID());
List<DatanodeDetails> datanodes = pipeline.getNodes();
Assert.assertEquals(1, datanodes.size());
- waitForContainerClose(keyName, key);
+ waitForContainerClose(key);
dataString =
ContainerTestHelper.getFixedLengthString(keyString, (1 * blockSize));
data = dataString.getBytes(UTF_8);
@@ -421,7 +421,7 @@ public class TestCloseContainerHandlingByClient {
.build();
Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
- waitForContainerClose(keyName, key);
+ waitForContainerClose(key);
// Again Write the Data. This will throw an exception which will be handled
// and new blocks will be allocated
key.write(data);
@@ -435,4 +435,42 @@ public class TestCloseContainerHandlingByClient {
Assert.assertEquals(2 * data.length, keyInfo.getDataSize());
validateData(keyName, dataString.getBytes(UTF_8));
}
+
+ @Test
+ public void testBlockWrites() throws Exception {
+ String keyName = getKeyName();
+ OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+ // write data more than 1 chunk
+ byte[] data1 =
+ ContainerTestHelper.getFixedLengthString(keyString, 2 * chunkSize)
+ .getBytes(UTF_8);
+ key.write(data1);
+
+ Assert.assertTrue(key.getOutputStream() instanceof KeyOutputStream);
+ //get the name of a valid container
+ OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+ .setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+ .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
+ .build();
+
+ waitForContainerClose(key);
+ byte[] data2 =
+ ContainerTestHelper.getFixedLengthString(keyString, 3 * chunkSize)
+ .getBytes(UTF_8);
+ key.write(data2);
+ key.flush();
+ key.close();
+ // read the key from OM again and match the length.The length will still
+ // be the equal to the original data size.
+ OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+ Assert.assertEquals(5 * chunkSize, keyInfo.getDataSize());
+
+ // Written the same data twice
+ String dataString = new String(data1, UTF_8);
+ // Written the same data twice
+ String dataString2 = new String(data2, UTF_8);
+ dataString = dataString.concat(dataString2);
+ validateData(keyName, dataString.getBytes(UTF_8));
+ }
+
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index 78f7d29..41d87a2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -142,7 +142,7 @@ public class TestBlockDeletion {
OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
.setBucketName(bucketName).setKeyName(keyName).setDataSize(0)
- .setType(HddsProtos.ReplicationType.STAND_ALONE)
+ .setType(HddsProtos.ReplicationType.RATIS)
.setFactor(HddsProtos.ReplicationFactor.ONE).build();
List<OmKeyLocationInfoGroup> omKeyLocationInfoGroupList =
om.lookupKey(keyArgs).getKeyLocationVersions();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org