You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/04/11 14:02:00 UTC
[hadoop] branch trunk updated: HDDS-1348. Refactor BlockOutpuStream
Class. Contributed by Shashikant Banerjee.
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new a0468c5 HDDS-1348. Refactor BlockOutpuStream Class. Contributed by Shashikant Banerjee.
a0468c5 is described below
commit a0468c5756054a0ebe83d256efaea20e3003cd92
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Thu Apr 11 19:31:26 2019 +0530
HDDS-1348. Refactor BlockOutpuStream Class. Contributed by Shashikant Banerjee.
---
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 135 ++--------
.../hadoop/hdds/scm/storage/CommitWatcher.java | 237 +++++++++++++++++
.../hadoop/ozone/client/rpc/TestCommitWatcher.java | 296 +++++++++++++++++++++
3 files changed, 558 insertions(+), 110 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 13c4a0c..139f494 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -48,16 +48,12 @@ import java.util.UUID;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
.putBlockAsync;
@@ -97,7 +93,6 @@ public class BlockOutputStream extends OutputStream {
private int chunkSize;
private final long streamBufferFlushSize;
private final long streamBufferMaxSize;
- private final long watchTimeout;
private BufferPool bufferPool;
// The IOException will be set by response handling thread in case there is an
// exception received in the response. If the exception is set, the next
@@ -111,10 +106,6 @@ public class BlockOutputStream extends OutputStream {
// effective data write attempted so far for the block
private long writtenDataLength;
- // total data which has been successfully flushed and acknowledged
- // by all servers
- private long totalAckDataLength;
-
// List containing buffers for which the putBlock call will
// update the length in the datanodes. This list will just maintain
// references to the buffers in the BufferPool which will be cleared
@@ -123,17 +114,10 @@ public class BlockOutputStream extends OutputStream {
// which got written between successive putBlock calls.
private List<ByteBuffer> bufferList;
- // future Map to hold up all putBlock futures
- private ConcurrentHashMap<Long,
- CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
- futureMap;
-
- // The map should maintain the keys (logIndexes) in order so that while
- // removing we always end up updating incremented data flushed length.
+ // This object will maintain the commitIndexes and byteBufferList in order
// Also, corresponding to the logIndex, the corresponding list of buffers will
// be released from the buffer pool.
- private ConcurrentSkipListMap<Long, List<ByteBuffer>>
- commitIndex2flushedDataMap;
+ private final CommitWatcher commitWatcher;
private List<DatanodeDetails> failedServers;
@@ -175,20 +159,17 @@ public class BlockOutputStream extends OutputStream {
this.chunkIndex = 0;
this.streamBufferFlushSize = streamBufferFlushSize;
this.streamBufferMaxSize = streamBufferMaxSize;
- this.watchTimeout = watchTimeout;
this.bufferPool = bufferPool;
this.checksumType = checksumType;
this.bytesPerChecksum = bytesPerChecksum;
// A single thread executor handle the responses of async requests
responseExecutor = Executors.newSingleThreadExecutor();
- commitIndex2flushedDataMap = new ConcurrentSkipListMap<>();
- totalAckDataLength = 0;
- futureMap = new ConcurrentHashMap<>();
+ commitWatcher = new CommitWatcher(bufferPool, xceiverClient, watchTimeout);
+ bufferList = null;
totalDataFlushedLength = 0;
writtenDataLength = 0;
failedServers = Collections.emptyList();
- bufferList = null;
ioException = new AtomicReference<>(null);
}
@@ -198,7 +179,7 @@ public class BlockOutputStream extends OutputStream {
}
public long getTotalAckDataLength() {
- return totalAckDataLength;
+ return commitWatcher.getTotalAckDataLength();
}
public long getWrittenDataLength() {
@@ -230,7 +211,7 @@ public class BlockOutputStream extends OutputStream {
@VisibleForTesting
public Map<Long, List<ByteBuffer>> getCommitIndex2flushedDataMap() {
- return commitIndex2flushedDataMap;
+ return commitWatcher.getCommitIndex2flushedDataMap();
}
@Override
@@ -334,34 +315,6 @@ public class BlockOutputStream extends OutputStream {
}
/**
- * just update the totalAckDataLength. In case of failure,
- * we will read the data starting from totalAckDataLength.
- */
- private void updateFlushIndex(List<Long> indexes) {
- Preconditions.checkArgument(!commitIndex2flushedDataMap.isEmpty());
- for (long index : indexes) {
- Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index));
- List<ByteBuffer> buffers = commitIndex2flushedDataMap.remove(index);
- long length = buffers.stream().mapToLong(value -> {
- int pos = value.position();
- Preconditions.checkArgument(pos <= chunkSize);
- return pos;
- }).sum();
- // totalAckDataLength replicated yet should always be incremented
- // with the current length being returned from commitIndex2flushedDataMap.
- totalAckDataLength += length;
- LOG.debug("Total data successfully replicated: " + totalAckDataLength);
- futureMap.remove(totalAckDataLength);
- // Flush has been committed to required servers successful.
- // just release the current buffer from the buffer pool corresponding
- // to the buffers that have been committed with the putBlock call.
- for (ByteBuffer byteBuffer : buffers) {
- bufferPool.releaseBuffer(byteBuffer);
- }
- }
- }
-
- /**
* This is a blocking call. It will wait for the flush till the commit index
* at the head of the commitIndex2flushedDataMap gets replicated to all or
* majority.
@@ -370,7 +323,7 @@ public class BlockOutputStream extends OutputStream {
private void handleFullBuffer() throws IOException {
try {
checkOpen();
- if (!futureMap.isEmpty()) {
+ if (!commitWatcher.getFutureMap().isEmpty()) {
waitOnFlushFutures();
}
} catch (InterruptedException | ExecutionException e) {
@@ -378,47 +331,31 @@ public class BlockOutputStream extends OutputStream {
adjustBuffersOnException();
throw getIoException();
}
- if (!commitIndex2flushedDataMap.isEmpty()) {
- watchForCommit(
- commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v)
- .min().getAsLong());
- }
+ watchForCommit(true);
}
- private void adjustBuffers(long commitIndex) {
- List<Long> keyList = commitIndex2flushedDataMap.keySet().stream()
- .filter(p -> p <= commitIndex).collect(Collectors.toList());
- if (keyList.isEmpty()) {
- return;
- } else {
- updateFlushIndex(keyList);
- }
- }
// It may happen that once the exception is encountered , we still might
// have successfully flushed up to a certain index. Make sure the buffers
// only contain data which have not been sufficiently replicated
private void adjustBuffersOnException() {
- adjustBuffers(xceiverClient.getReplicatedMinCommitIndex());
+ commitWatcher.releaseBuffersOnException();
}
/**
* calls watchForCommit API of the Ratis Client. For Standalone client,
* it is a no op.
- * @param commitIndex log index to watch for
+ * @param bufferFull flag indicating whether bufferFull condition is hit or
+ * its called as part flush/close
* @return minimum commit index replicated to all nodes
* @throws IOException IOException in case watch gets timed out
*/
- private void watchForCommit(long commitIndex) throws IOException {
+ private void watchForCommit(boolean bufferFull) throws IOException {
checkOpen();
- Preconditions.checkState(!commitIndex2flushedDataMap.isEmpty());
- long index;
try {
- XceiverClientReply reply =
- xceiverClient.watchForCommit(commitIndex, watchTimeout);
- if (reply == null) {
- index = 0;
- } else {
+ XceiverClientReply reply = bufferFull ?
+ commitWatcher.watchOnFirstIndex() : commitWatcher.watchOnLastIndex();
+ if (reply != null) {
List<DatanodeDetails> dnList = reply.getDatanodes();
if (!dnList.isEmpty()) {
if (failedServers.isEmpty()) {
@@ -426,13 +363,9 @@ public class BlockOutputStream extends OutputStream {
}
failedServers.addAll(dnList);
}
- index = reply.getLogIndex();
}
- adjustBuffers(index);
- } catch (TimeoutException | InterruptedException | ExecutionException e) {
- LOG.warn("watchForCommit failed for index " + commitIndex, e);
- setIoException(e);
- adjustBuffersOnException();
+ } catch (IOException ioe) {
+ setIoException(ioe);
throw getIoException();
}
}
@@ -471,14 +404,14 @@ public class BlockOutputStream extends OutputStream {
blockID = responseBlockID;
LOG.debug(
"Adding index " + asyncReply.getLogIndex() + " commitMap size "
- + commitIndex2flushedDataMap.size() + " flushLength "
+ + commitWatcher.getCommitInfoMapSize() + " flushLength "
+ flushPos + " numBuffers " + byteBufferList.size()
+ " blockID " + blockID + " bufferPool size" + bufferPool
.getSize() + " currentBufferIndex " + bufferPool
.getCurrentBufferIndex());
// for standalone protocol, logIndex will always be 0.
- commitIndex2flushedDataMap
- .put(asyncReply.getLogIndex(), byteBufferList);
+ commitWatcher
+ .updateCommitInfoMap(asyncReply.getLogIndex(), byteBufferList);
}
return e;
}, responseExecutor).exceptionally(e -> {
@@ -493,7 +426,7 @@ public class BlockOutputStream extends OutputStream {
throw new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e);
}
- futureMap.put(flushPos, flushFuture);
+ commitWatcher.getFutureMap().put(flushPos, flushFuture);
return flushFuture;
}
@@ -553,18 +486,7 @@ public class BlockOutputStream extends OutputStream {
executePutBlock();
}
waitOnFlushFutures();
- if (!commitIndex2flushedDataMap.isEmpty()) {
- // wait for the last commit index in the commitIndex2flushedDataMap
- // to get committed to all or majority of nodes in case timeout
- // happens.
- long lastIndex =
- commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v)
- .max().getAsLong();
- LOG.debug(
- "waiting for last flush Index " + lastIndex + " to catch up");
- watchForCommit(lastIndex);
- }
-
+ watchForCommit(false);
// just check again if the exception is hit while waiting for the
// futures to ensure flush has indeed succeeded
@@ -594,11 +516,11 @@ public class BlockOutputStream extends OutputStream {
}
}
-
private void waitOnFlushFutures()
throws InterruptedException, ExecutionException {
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(
- futureMap.values().toArray(new CompletableFuture[futureMap.size()]));
+ commitWatcher.getFutureMap().values().toArray(
+ new CompletableFuture[commitWatcher.getFutureMap().size()]));
// wait for all the transactions to complete
combinedFuture.get();
}
@@ -637,18 +559,11 @@ public class BlockOutputStream extends OutputStream {
}
xceiverClientManager = null;
xceiverClient = null;
- if (futureMap != null) {
- futureMap.clear();
- }
- futureMap = null;
+ commitWatcher.cleanup();
if (bufferList != null) {
bufferList.clear();
}
bufferList = null;
- if (commitIndex2flushedDataMap != null) {
- commitIndex2flushedDataMap.clear();
- }
- commitIndex2flushedDataMap = null;
responseExecutor.shutdown();
}
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
new file mode 100644
index 0000000..aeac941
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This class maintains the map of the commitIndexes to be watched for
+ * successful replication in the datanodes in a given pipeline. It also releases
+ * the buffers associated with the user data back to {@Link BufferPool} once
+ * minimum replication criteria is achieved during an ozone key write.
+ */
+package org.apache.hadoop.hdds.scm.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.ExecutionException;
+
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.stream.Collectors;
+
+/**
+ * This class executes watchForCommit on ratis pipeline and releases
+ * buffers once data successfully gets replicated.
+ */
+public class CommitWatcher {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CommitWatcher.class);
+
+ // A reference to the pool of buffers holding the data
+ private BufferPool bufferPool;
+
+ // The map should maintain the keys (logIndexes) in order so that while
+ // removing we always end up updating incremented data flushed length.
+ // Also, corresponding to the logIndex, the corresponding list of buffers will
+ // be released from the buffer pool.
+ private ConcurrentSkipListMap<Long, List<ByteBuffer>>
+ commitIndex2flushedDataMap;
+
+ // future Map to hold up all putBlock futures
+ private ConcurrentHashMap<Long,
+ CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
+ futureMap;
+
+ private XceiverClientSpi xceiverClient;
+
+ private final long watchTimeout;
+
+ // total data which has been successfully flushed and acknowledged
+ // by all servers
+ private long totalAckDataLength;
+
+ public CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient,
+ long watchTimeout) {
+ this.bufferPool = bufferPool;
+ this.xceiverClient = xceiverClient;
+ this.watchTimeout = watchTimeout;
+ commitIndex2flushedDataMap = new ConcurrentSkipListMap<>();
+ totalAckDataLength = 0;
+ futureMap = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * just update the totalAckDataLength. In case of failure,
+ * we will read the data starting from totalAckDataLength.
+ */
+ private long releaseBuffers(List<Long> indexes) {
+ Preconditions.checkArgument(!commitIndex2flushedDataMap.isEmpty());
+ for (long index : indexes) {
+ Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index));
+ List<ByteBuffer> buffers = commitIndex2flushedDataMap.remove(index);
+ long length = buffers.stream().mapToLong(value -> {
+ int pos = value.position();
+ return pos;
+ }).sum();
+ totalAckDataLength += length;
+ // clear the future object from the future Map
+ Preconditions.checkNotNull(futureMap.remove(totalAckDataLength));
+ for (ByteBuffer byteBuffer : buffers) {
+ bufferPool.releaseBuffer(byteBuffer);
+ }
+ }
+ return totalAckDataLength;
+ }
+
+ public void updateCommitInfoMap(long index, List<ByteBuffer> byteBufferList) {
+ commitIndex2flushedDataMap
+ .put(index, byteBufferList);
+ }
+
+ int getCommitInfoMapSize() {
+ return commitIndex2flushedDataMap.size();
+ }
+
+ /**
+ * Calls watch for commit for the first index in commitIndex2flushedDataMap to
+ * the Ratis client.
+ * @return reply reply from raft client
+ * @throws IOException in case watchForCommit fails
+ */
+ public XceiverClientReply watchOnFirstIndex() throws IOException {
+ if (!commitIndex2flushedDataMap.isEmpty()) {
+ // wait for the first commit index in the commitIndex2flushedDataMap
+ // to get committed to all or majority of nodes in case timeout
+ // happens.
+ long index =
+ commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v).min()
+ .getAsLong();
+ LOG.debug("waiting for first index " + index + " to catch up");
+ return watchForCommit(index);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Calls watch for commit for the first index in commitIndex2flushedDataMap to
+ * the Ratis client.
+ * @return reply reply from raft client
+ * @throws IOException in case watchForCommit fails
+ */
+ public XceiverClientReply watchOnLastIndex()
+ throws IOException {
+ if (!commitIndex2flushedDataMap.isEmpty()) {
+ // wait for the commit index in the commitIndex2flushedDataMap
+ // to get committed to all or majority of nodes in case timeout
+ // happens.
+ long index =
+ commitIndex2flushedDataMap.keySet().stream().mapToLong(v -> v).max()
+ .getAsLong();
+ LOG.debug("waiting for last flush Index " + index + " to catch up");
+ return watchForCommit(index);
+ } else {
+ return null;
+ }
+ }
+
+
+ private void adjustBuffers(long commitIndex) {
+ List<Long> keyList = commitIndex2flushedDataMap.keySet().stream()
+ .filter(p -> p <= commitIndex).collect(Collectors.toList());
+ if (keyList.isEmpty()) {
+ return;
+ } else {
+ releaseBuffers(keyList);
+ }
+ }
+
+ // It may happen that once the exception is encountered , we still might
+ // have successfully flushed up to a certain index. Make sure the buffers
+ // only contain data which have not been sufficiently replicated
+ void releaseBuffersOnException() {
+ adjustBuffers(xceiverClient.getReplicatedMinCommitIndex());
+ }
+
+
+ /**
+ * calls watchForCommit API of the Ratis Client. For Standalone client,
+ * it is a no op.
+ * @param commitIndex log index to watch for
+ * @return minimum commit index replicated to all nodes
+ * @throws IOException IOException in case watch gets timed out
+ */
+ public XceiverClientReply watchForCommit(long commitIndex)
+ throws IOException {
+ Preconditions.checkState(!commitIndex2flushedDataMap.isEmpty());
+ long index;
+ try {
+ XceiverClientReply reply =
+ xceiverClient.watchForCommit(commitIndex, watchTimeout);
+ if (reply == null) {
+ index = 0;
+ } else {
+ index = reply.getLogIndex();
+ }
+ adjustBuffers(index);
+ return reply;
+ } catch (TimeoutException | InterruptedException | ExecutionException e) {
+ LOG.warn("watchForCommit failed for index " + commitIndex, e);
+ IOException ioException = new IOException(
+ "Unexpected Storage Container Exception: " + e.toString(), e);
+ releaseBuffersOnException();
+ throw ioException;
+ }
+ }
+
+ @VisibleForTesting
+ public ConcurrentSkipListMap<Long,
+ List<ByteBuffer>> getCommitIndex2flushedDataMap() {
+ return commitIndex2flushedDataMap;
+ }
+
+ public ConcurrentHashMap<Long,
+ CompletableFuture<ContainerProtos.
+ ContainerCommandResponseProto>> getFutureMap() {
+ return futureMap;
+ }
+
+ public long getTotalAckDataLength() {
+ return totalAckDataLength;
+ }
+
+ public void cleanup() {
+ if (commitIndex2flushedDataMap != null) {
+ commitIndex2flushedDataMap.clear();
+ }
+ if (futureMap != null) {
+ futureMap.clear();
+ }
+ commitIndex2flushedDataMap = null;
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
new file mode 100644
index 0000000..ea51900
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.scm.storage.BufferPool;
+import org.apache.hadoop.hdds.scm.storage.CommitWatcher;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * Class to test CommitWatcher functionality.
+ */
+public class TestCommitWatcher {
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration conf = new OzoneConfiguration();
+ private static OzoneClient client;
+ private static ObjectStore objectStore;
+ private static int chunkSize;
+ private static long flushSize;
+ private static long maxFlushSize;
+ private static long blockSize;
+ private static String volumeName;
+ private static String bucketName;
+ private static String keyString;
+ private static StorageContainerLocationProtocolClientSideTranslatorPB
+ storageContainerLocationClient;
+ private static String containerOwner = "OZONE";
+
+ /**
+ * Create a MiniDFSCluster for testing.
+ * <p>
+ * Ozone is made active by setting OZONE_ENABLED = true
+ *
+ * @throws IOException
+ */
+ @BeforeClass
+ public static void init() throws Exception {
+ chunkSize = (int)(1 * OzoneConsts.MB);
+ flushSize = 2 * chunkSize;
+ maxFlushSize = 2 * flushSize;
+ blockSize = 2 * maxFlushSize;
+ conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
+ conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+ conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
+ conf.setQuietMode(false);
+ conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
+ StorageUnit.MB);
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(7)
+ .setBlockSize(blockSize)
+ .setChunkSize(chunkSize)
+ .setStreamBufferFlushSize(flushSize)
+ .setStreamBufferMaxSize(maxFlushSize)
+ .setStreamBufferSizeUnit(StorageUnit.BYTES)
+ .build();
+ cluster.waitForClusterToBeReady();
+ //the easiest way to create an open container is creating a key
+ client = OzoneClientFactory.getClient(conf);
+ objectStore = client.getObjectStore();
+ keyString = UUID.randomUUID().toString();
+ volumeName = "testblockoutputstream";
+ bucketName = volumeName;
+ objectStore.createVolume(volumeName);
+ objectStore.getVolume(volumeName).createBucket(bucketName);
+ storageContainerLocationClient = cluster
+ .getStorageContainerLocationClient();
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @AfterClass
+ public static void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testReleaseBuffers() throws Exception {
+ int capacity = 2;
+ BufferPool bufferPool = new BufferPool(chunkSize, capacity);
+ XceiverClientManager clientManager = new XceiverClientManager(conf);
+ ContainerWithPipeline container = storageContainerLocationClient
+ .allocateContainer(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE, containerOwner);
+ Pipeline pipeline = container.getPipeline();
+ long containerId = container.getContainerInfo().getContainerID();
+ XceiverClientSpi xceiverClient = clientManager.acquireClient(pipeline);
+ Assert.assertEquals(1, xceiverClient.getRefcount());
+ Assert.assertTrue(xceiverClient instanceof XceiverClientRatis);
+ XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient;
+ CommitWatcher watcher = new CommitWatcher(bufferPool, ratisClient, 10000);
+ BlockID blockID = ContainerTestHelper.getTestBlockID(containerId);
+ List<ByteBuffer> bufferList = new ArrayList<>();
+ List<XceiverClientReply> replies = new ArrayList<>();
+ long length = 0;
+ List<CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
+ futures = new ArrayList<>();
+ for (int i = 0; i < capacity; i++) {
+ bufferList.clear();
+ ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
+ ContainerTestHelper
+ .getWriteChunkRequest(pipeline, blockID, chunkSize);
+ // add the data to the buffer pool
+ ByteBuffer byteBuffer = bufferPool.allocateBufferIfNeeded().put(
+ writeChunkRequest.getWriteChunk().getData().asReadOnlyByteBuffer());
+ ratisClient.sendCommandAsync(writeChunkRequest);
+ ContainerProtos.ContainerCommandRequestProto putBlockRequest =
+ ContainerTestHelper
+ .getPutBlockRequest(pipeline, writeChunkRequest.getWriteChunk());
+ XceiverClientReply reply = ratisClient.sendCommandAsync(putBlockRequest);
+ bufferList.add(byteBuffer);
+ length += byteBuffer.position();
+ CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
+ reply.getResponse().thenApply(v -> {
+ watcher.updateCommitInfoMap(reply.getLogIndex(), bufferList);
+ return v;
+ });
+ futures.add(future);
+ watcher.getFutureMap().put(length, future);
+ replies.add(reply);
+ }
+
+ Assert.assertTrue(replies.size() == 2);
+ // wait on the 1st putBlock to complete
+ CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future1 =
+ futures.get(0);
+ CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future2 =
+ futures.get(1);
+ future1.get();
+ Assert.assertNotNull(watcher.getFutureMap().get(new Long(chunkSize)));
+ Assert.assertTrue(
+ watcher.getFutureMap().get(new Long(chunkSize)).equals(future1));
+ // wait on 2nd putBlock to complete
+ future2.get();
+ Assert.assertNotNull(watcher.getFutureMap().get(new Long(2 * chunkSize)));
+ Assert.assertTrue(
+ watcher.getFutureMap().get(new Long(2 * chunkSize)).equals(future2));
+ Assert.assertTrue(watcher.getCommitIndex2flushedDataMap().size() == 2);
+ watcher.watchOnFirstIndex();
+ Assert.assertFalse(watcher.getCommitIndex2flushedDataMap()
+ .containsKey(replies.get(0).getLogIndex()));
+ Assert.assertFalse(watcher.getFutureMap().containsKey(chunkSize));
+ Assert.assertTrue(watcher.getTotalAckDataLength() >= chunkSize);
+ watcher.watchOnLastIndex();
+ Assert.assertFalse(watcher.getCommitIndex2flushedDataMap()
+ .containsKey(replies.get(1).getLogIndex()));
+ Assert.assertFalse(watcher.getFutureMap().containsKey(2 * chunkSize));
+ Assert.assertTrue(watcher.getTotalAckDataLength() == 2 * chunkSize);
+ Assert.assertTrue(watcher.getFutureMap().isEmpty());
+ Assert.assertTrue(watcher.getCommitIndex2flushedDataMap().isEmpty());
+ }
+
+ @Test
+ public void testReleaseBuffersOnException() throws Exception {
+ int capacity = 2;
+ BufferPool bufferPool = new BufferPool(chunkSize, capacity);
+ XceiverClientManager clientManager = new XceiverClientManager(conf);
+ ContainerWithPipeline container = storageContainerLocationClient
+ .allocateContainer(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE, containerOwner);
+ Pipeline pipeline = container.getPipeline();
+ long containerId = container.getContainerInfo().getContainerID();
+ XceiverClientSpi xceiverClient = clientManager.acquireClient(pipeline);
+ Assert.assertEquals(1, xceiverClient.getRefcount());
+ Assert.assertTrue(xceiverClient instanceof XceiverClientRatis);
+ XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient;
+ CommitWatcher watcher = new CommitWatcher(bufferPool, ratisClient, 10000);
+ BlockID blockID = ContainerTestHelper.getTestBlockID(containerId);
+ List<ByteBuffer> bufferList = new ArrayList<>();
+ List<XceiverClientReply> replies = new ArrayList<>();
+ long length = 0;
+ List<CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
+ futures = new ArrayList<>();
+ for (int i = 0; i < capacity; i++) {
+ bufferList.clear();
+ ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
+ ContainerTestHelper
+ .getWriteChunkRequest(pipeline, blockID, chunkSize);
+ // add the data to the buffer pool
+ ByteBuffer byteBuffer = bufferPool.allocateBufferIfNeeded().put(
+ writeChunkRequest.getWriteChunk().getData().asReadOnlyByteBuffer());
+ ratisClient.sendCommandAsync(writeChunkRequest);
+ ContainerProtos.ContainerCommandRequestProto putBlockRequest =
+ ContainerTestHelper
+ .getPutBlockRequest(pipeline, writeChunkRequest.getWriteChunk());
+ XceiverClientReply reply = ratisClient.sendCommandAsync(putBlockRequest);
+ bufferList.add(byteBuffer);
+ length += byteBuffer.position();
+ CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future =
+ reply.getResponse().thenApply(v -> {
+ watcher.updateCommitInfoMap(reply.getLogIndex(), bufferList);
+ return v;
+ });
+ futures.add(future);
+ watcher.getFutureMap().put(length, future);
+ replies.add(reply);
+ }
+
+ Assert.assertTrue(replies.size() == 2);
+ // wait on the 1st putBlock to complete
+ CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future1 =
+ futures.get(0);
+ CompletableFuture<ContainerProtos.ContainerCommandResponseProto> future2 =
+ futures.get(1);
+ future1.get();
+ Assert.assertNotNull(watcher.getFutureMap().get(new Long(chunkSize)));
+ Assert.assertTrue(
+ watcher.getFutureMap().get(new Long(chunkSize)).equals(future1));
+ // wait on 2nd putBlock to complete
+ future2.get();
+ Assert.assertNotNull(watcher.getFutureMap().get(new Long(2 * chunkSize)));
+ Assert.assertTrue(
+ watcher.getFutureMap().get(new Long(2 * chunkSize)).equals(future2));
+ Assert.assertTrue(watcher.getCommitIndex2flushedDataMap().size() == 2);
+ watcher.watchOnFirstIndex();
+ Assert.assertFalse(watcher.getCommitIndex2flushedDataMap()
+ .containsKey(replies.get(0).getLogIndex()));
+ Assert.assertFalse(watcher.getFutureMap().containsKey(chunkSize));
+ Assert.assertTrue(watcher.getTotalAckDataLength() >= chunkSize);
+ cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+ cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
+ try {
+ // just watch for a higher index so as to ensure, it does an actual
+ // call to Ratis. Otherwise, it may just return in case the commitInfoMap
+ // is updated to the latest index in putBlock response.
+ watcher.watchForCommit(replies.get(1).getLogIndex() + 1);
+ } catch(IOException ioe) {
+ Assert.assertTrue(ioe.getCause() instanceof TimeoutException);
+ }
+ long lastIndex = replies.get(1).getLogIndex();
+ // Depending on the last successfully replicated commitIndex, either we
+ // discard only 1st buffer or both buffers
+ Assert.assertTrue(ratisClient.getReplicatedMinCommitIndex() <= lastIndex);
+ if (ratisClient.getReplicatedMinCommitIndex() < replies.get(1)
+ .getLogIndex()) {
+ Assert.assertTrue(watcher.getTotalAckDataLength() == chunkSize);
+ Assert.assertTrue(watcher.getCommitIndex2flushedDataMap().size() == 1);
+ Assert.assertTrue(watcher.getFutureMap().size() == 1);
+ } else {
+ Assert.assertTrue(watcher.getTotalAckDataLength() == 2 * chunkSize);
+ Assert.assertTrue(watcher.getFutureMap().isEmpty());
+ Assert.assertTrue(watcher.getCommitIndex2flushedDataMap().isEmpty());
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org