You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2019/03/14 14:03:37 UTC
[hadoop] branch trunk updated: HDDS-1257. Incorrect object because
of mismatch in block lengths. Contributed by Shashikant Banerjee.
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new d60673c HDDS-1257. Incorrect object because of mismatch in block lengths. Contributed by Shashikant Banerjee.
d60673c is described below
commit d60673c47077d69320ae1bd37c6b74489bef25f7
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Thu Mar 14 19:32:36 2019 +0530
HDDS-1257. Incorrect object because of mismatch in block lengths. Contributed by Shashikant Banerjee.
---
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 38 ++++++++++++++--------
1 file changed, 25 insertions(+), 13 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index fe41f57..13913ee 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -46,6 +46,7 @@ import java.util.UUID;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.*;
+import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls
.putBlockAsync;
@@ -108,7 +109,10 @@ public class BlockOutputStream extends OutputStream {
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>>
futureMap;
// map containing mapping for putBlock logIndex to to flushedDataLength Map.
- private ConcurrentHashMap<Long, Long> commitIndex2flushedDataMap;
+
+ // The map should maintain the keys (logIndexes) in order so that while
+ // removing we always end up updating incremented data flushed length.
+ private ConcurrentSkipListMap<Long, Long> commitIndex2flushedDataMap;
private List<DatanodeDetails> failedServers;
@@ -157,7 +161,7 @@ public class BlockOutputStream extends OutputStream {
// A single thread executor handle the responses of async requests
responseExecutor = Executors.newSingleThreadExecutor();
- commitIndex2flushedDataMap = new ConcurrentHashMap<>();
+ commitIndex2flushedDataMap = new ConcurrentSkipListMap<>();
totalAckDataLength = 0;
futureMap = new ConcurrentHashMap<>();
totalDataFlushedLength = 0;
@@ -206,7 +210,7 @@ public class BlockOutputStream extends OutputStream {
int writeLen;
// Allocate a buffer if needed. The buffer will be allocated only
- // once as needed and will be reused again for mutiple blockOutputStream
+ // once as needed and will be reused again for multiple blockOutputStream
// entries.
ByteBuffer currentBuffer = bufferPool.allocateBufferIfNeeded();
int pos = currentBuffer.position();
@@ -281,10 +285,18 @@ public class BlockOutputStream extends OutputStream {
* just update the totalAckDataLength. In case of failure,
* we will read the data starting from totalAckDataLength.
*/
- private void updateFlushIndex(long index) {
- if (!commitIndex2flushedDataMap.isEmpty()) {
+ private void updateFlushIndex(List<Long> indexes) {
+ Preconditions.checkArgument(!commitIndex2flushedDataMap.isEmpty());
+ for (long index : indexes) {
Preconditions.checkState(commitIndex2flushedDataMap.containsKey(index));
- totalAckDataLength = commitIndex2flushedDataMap.remove(index);
+ long length = commitIndex2flushedDataMap.remove(index);
+
+ // totalAckDataLength replicated yet should always be less than equal to
+ // the current length being returned from commitIndex2flushedDataMap.
+ // The below precondition would ensure commitIndex2flushedDataMap entries
+ // are removed in order of the insertion to the map.
+ Preconditions.checkArgument(totalAckDataLength < length);
+ totalAckDataLength = length;
LOG.debug("Total data successfully replicated: " + totalAckDataLength);
futureMap.remove(totalAckDataLength);
// Flush has been committed to required servers successful.
@@ -325,13 +337,13 @@ public class BlockOutputStream extends OutputStream {
}
private void adjustBuffers(long commitIndex) {
- commitIndex2flushedDataMap.keySet().stream().forEach(index -> {
- if (index <= commitIndex) {
- updateFlushIndex(index);
- } else {
- return;
- }
- });
+ List<Long> keyList = commitIndex2flushedDataMap.keySet().stream()
+ .filter(p -> p <= commitIndex).collect(Collectors.toList());
+ if (keyList.isEmpty()) {
+ return;
+ } else {
+ updateFlushIndex(keyList);
+ }
}
// It may happen that once the exception is encountered , we still might
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org