You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2018/09/05 00:11:01 UTC

hadoop git commit: HDDS-383. Ozone Client should discard preallocated blocks from closed containers. Contributed by Shashikant Banerjee

Repository: hadoop
Updated Branches:
  refs/heads/trunk 6e4c73147 -> 6883fe860


HDDS-383. Ozone Client should discard preallocated blocks from closed containers.  Contributed by Shashikant Banerjee


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6883fe86
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6883fe86
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6883fe86

Branch: refs/heads/trunk
Commit: 6883fe860f484da2b835f9f57307b84165ed7f6f
Parents: 6e4c731
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Tue Sep 4 17:10:10 2018 -0700
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Tue Sep 4 17:10:44 2018 -0700

----------------------------------------------------------------------
 .../ozone/client/io/ChunkGroupOutputStream.java | 86 +++++++++++++-------
 .../hadoop/ozone/om/helpers/OmKeyInfo.java      |  4 +
 .../rpc/TestCloseContainerHandlingByClient.java | 64 ++++++++++-----
 3 files changed, 102 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6883fe86/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
index 21406b5..3742a9a 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
@@ -53,6 +53,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.ListIterator;
 
 /**
  * Maintaining a list of ChunkInputStream. Write based on offset.
@@ -81,7 +82,6 @@ public class ChunkGroupOutputStream extends OutputStream {
   private final int chunkSize;
   private final String requestID;
   private boolean closed;
-  private List<OmKeyLocationInfo> locationInfoList;
   private final RetryPolicy retryPolicy;
   /**
    * A constructor for testing purpose only.
@@ -97,7 +97,6 @@ public class ChunkGroupOutputStream extends OutputStream {
     chunkSize = 0;
     requestID = null;
     closed = false;
-    locationInfoList = null;
     retryPolicy = null;
   }
 
@@ -118,9 +117,16 @@ public class ChunkGroupOutputStream extends OutputStream {
     return streamEntries;
   }
 
-  @VisibleForTesting
-  public long getOpenID() {
-    return openID;
+  public List<OmKeyLocationInfo> getLocationInfoList() {
+    List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
+    for (ChunkOutputStreamEntry streamEntry : streamEntries) {
+      OmKeyLocationInfo info =
+          new OmKeyLocationInfo.Builder().setBlockID(streamEntry.blockID)
+              .setShouldCreateContainer(false)
+              .setLength(streamEntry.currentPosition).setOffset(0).build();
+      locationInfoList.add(info);
+    }
+    return locationInfoList;
   }
 
   public ChunkGroupOutputStream(
@@ -146,7 +152,6 @@ public class ChunkGroupOutputStream extends OutputStream {
     this.xceiverClientManager = xceiverClientManager;
     this.chunkSize = chunkSize;
     this.requestID = requestId;
-    this.locationInfoList = new ArrayList<>();
     this.retryPolicy = retryPolicy;
     LOG.debug("Expecting open key with one block, but got" +
         info.getKeyLocationVersions().size());
@@ -211,18 +216,6 @@ public class ChunkGroupOutputStream extends OutputStream {
     streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(),
         keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
         chunkSize, subKeyInfo.getLength()));
-    // reset the original length to zero here. It will be updated as and when
-    // the data gets written.
-    subKeyInfo.setLength(0);
-    locationInfoList.add(subKeyInfo);
-  }
-
-  private void incrementBlockLength(int index, long length) {
-    if (locationInfoList != null) {
-      OmKeyLocationInfo locationInfo = locationInfoList.get(index);
-      long originalLength = locationInfo.getLength();
-      locationInfo.setLength(originalLength + length);
-    }
   }
 
   @VisibleForTesting
@@ -298,7 +291,6 @@ public class ChunkGroupOutputStream extends OutputStream {
           throw ioe;
         }
       }
-      incrementBlockLength(currentStreamIndex, writeLen);
       if (current.getRemaining() <= 0) {
         // since the current block is already written close the stream.
         handleFlushOrClose(true);
@@ -316,12 +308,6 @@ public class ChunkGroupOutputStream extends OutputStream {
     ContainerProtos.GetCommittedBlockLengthResponseProto responseProto;
     RetryPolicy.RetryAction action;
     int numRetries = 0;
-
-    // TODO : At this point of time, we also need to allocate new blocks
-    // from a different container and may need to nullify
-    // all the remaining pre-allocated blocks in case they were
-    // pre-allocated on the same container which got closed now.This needs
-    // caching the closed container list on the client itself.
     while (true) {
       try {
         responseProto = ContainerProtocolCalls
@@ -367,6 +353,43 @@ public class ChunkGroupOutputStream extends OutputStream {
   }
 
   /**
+   * Discards the subsequent pre allocated blocks and removes the streamEntries
+   * from the streamEntries list for the container which is closed.
+   * @param containerID id of the closed container
+   */
+  private void discardPreallocatedBlocks(long containerID) {
+    // currentStreamIndex < streamEntries.size() signifies that, there are still
+    // pre allocated blocks available.
+    if (currentStreamIndex < streamEntries.size()) {
+      ListIterator<ChunkOutputStreamEntry> streamEntryIterator =
+          streamEntries.listIterator(currentStreamIndex);
+      while (streamEntryIterator.hasNext()) {
+        if (streamEntryIterator.next().blockID.getContainerID()
+            == containerID) {
+          streamEntryIterator.remove();
+        }
+      }
+    }
+  }
+
+  /**
+   * It might be possible that the blocks pre allocated might never get written
+   * while the stream gets closed normally. In such cases, it would be a good
+   * idea to trim down the locationInfoList by removing the unused blocks if any
+   * so as only the used block info gets updated on OzoneManager during close.
+   */
+  private void removeEmptyBlocks() {
+    if (currentStreamIndex < streamEntries.size()) {
+      ListIterator<ChunkOutputStreamEntry> streamEntryIterator =
+          streamEntries.listIterator(currentStreamIndex);
+      while (streamEntryIterator.hasNext()) {
+        if (streamEntryIterator.next().currentPosition == 0) {
+          streamEntryIterator.remove();
+        }
+      }
+    }
+  }
+  /**
    * It performs following actions :
    * a. Updates the committed length at datanode for the current stream in
    *    datanode.
@@ -396,7 +419,7 @@ public class ChunkGroupOutputStream extends OutputStream {
         || streamEntry.currentPosition != buffer.position()) {
       committedLength = getCommittedBlockLength(streamEntry);
       // update the length of the current stream
-      locationInfoList.get(streamIndex).setLength(committedLength);
+      streamEntry.currentPosition = committedLength;
     }
 
     if (buffer.position() > 0) {
@@ -418,10 +441,12 @@ public class ChunkGroupOutputStream extends OutputStream {
     // written. Remove it from the current stream list.
     if (committedLength == 0) {
       streamEntries.remove(streamIndex);
-      locationInfoList.remove(streamIndex);
       Preconditions.checkArgument(currentStreamIndex != 0);
       currentStreamIndex -= 1;
     }
+    // discard subsequent pre allocated blocks from the streamEntries list
+    // from the closed container
+    discardPreallocatedBlocks(streamEntry.blockID.getContainerID());
   }
 
   private boolean checkIfContainerIsClosed(IOException ioe) {
@@ -433,7 +458,7 @@ public class ChunkGroupOutputStream extends OutputStream {
   }
 
   private long getKeyLength() {
-    return locationInfoList.parallelStream().mapToLong(e -> e.getLength())
+    return streamEntries.parallelStream().mapToLong(e -> e.currentPosition)
         .sum();
   }
 
@@ -506,12 +531,11 @@ public class ChunkGroupOutputStream extends OutputStream {
     handleFlushOrClose(true);
     if (keyArgs != null) {
       // in test, this could be null
-      Preconditions.checkState(streamEntries.size() == locationInfoList.size());
+      removeEmptyBlocks();
       Preconditions.checkState(byteOffset == getKeyLength());
       keyArgs.setDataSize(byteOffset);
-      keyArgs.setLocationInfoList(locationInfoList);
+      keyArgs.setLocationInfoList(getLocationInfoList());
       omClient.commitKey(keyArgs, openID);
-      locationInfoList = null;
     } else {
       LOG.warn("Closing ChunkGroupOutputStream, but key args is null");
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6883fe86/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index f6e4265..50f4b17 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -122,6 +122,7 @@ public final class OmKeyInfo {
    * @throws IOException
    */
   public void updateLocationInfoList(List<OmKeyLocationInfo> locationInfoList) {
+    long latestVersion = getLatestVersionLocations().getVersion();
     OmKeyLocationInfoGroup keyLocationInfoGroup = getLatestVersionLocations();
     List<OmKeyLocationInfo> currentList =
         keyLocationInfoGroup.getLocationList();
@@ -134,6 +135,9 @@ public final class OmKeyInfo {
     // might get closed. The diff of blocks between these two lists here
     // need to be garbage collected in case the ozone client dies.
     currentList.removeAll(latestVersionList);
+    // set each of the locationInfo object to the latest version
+    locationInfoList.stream().forEach(omKeyLocationInfo -> omKeyLocationInfo
+        .setCreateVersion(latestVersion));
     currentList.addAll(locationInfoList);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6883fe86/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
index 9f12633..cf38982 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
@@ -38,12 +38,10 @@ import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
-import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -247,9 +245,8 @@ public class TestCloseContainerHandlingByClient {
 
   @Test
   public void testMultiBlockWrites2() throws Exception {
-
     String keyName = "standalone4";
-    long dataLength = 0;
+    long dataLength;
     OzoneOutputStream key =
         createKey(keyName, ReplicationType.STAND_ALONE, 4 * blockSize);
     ChunkGroupOutputStream groupOutputStream =
@@ -293,11 +290,10 @@ public class TestCloseContainerHandlingByClient {
   private void waitForContainerClose(String keyName,
       OzoneOutputStream outputStream, HddsProtos.ReplicationType type)
       throws Exception {
-
     ChunkGroupOutputStream groupOutputStream =
         (ChunkGroupOutputStream) outputStream.getOutputStream();
     List<OmKeyLocationInfo> locationInfoList =
-        getLocationInfos(groupOutputStream, keyName);
+        groupOutputStream.getLocationInfoList();
     List<Long> containerIdList = new ArrayList<>();
     List<Pipeline> pipelineList = new ArrayList<>();
     for (OmKeyLocationInfo info : locationInfoList) {
@@ -335,6 +331,46 @@ public class TestCloseContainerHandlingByClient {
     }
   }
 
+  @Test
+  public void testDiscardPreallocatedBlocks() throws Exception {
+    String keyName = "discardpreallocatedblocks";
+    OzoneOutputStream key =
+        createKey(keyName, ReplicationType.STAND_ALONE, 2 * blockSize);
+    ChunkGroupOutputStream groupOutputStream =
+        (ChunkGroupOutputStream) key.getOutputStream();
+
+    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+    // With the initial size provided, it should have pre allocated 4 blocks
+    Assert.assertEquals(2, groupOutputStream.getStreamEntries().size());
+    Assert.assertEquals(2, groupOutputStream.getLocationInfoList().size());
+    String dataString = fixedLengthString(keyString, (1 * blockSize));
+    byte[] data = dataString.getBytes();
+    key.write(data);
+    List<OmKeyLocationInfo> locationInfos =
+        new ArrayList<>(groupOutputStream.getLocationInfoList());
+    long containerID = locationInfos.get(0).getContainerID();
+    List<DatanodeDetails> datanodes =
+        cluster.getStorageContainerManager().getScmContainerManager()
+            .getContainerWithPipeline(containerID).getPipeline().getMachines();
+    Assert.assertEquals(1, datanodes.size());
+    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE);
+    dataString = fixedLengthString(keyString, (1 * blockSize));
+    data = dataString.getBytes();
+    key.write(data);
+    Assert.assertEquals(2, groupOutputStream.getStreamEntries().size());
+
+    // the 1st block got written. Now all the containers are closed, so the 2nd
+    // pre allocated block will be removed from the list and new block should
+    // have been allocated
+    Assert.assertTrue(
+        groupOutputStream.getLocationInfoList().get(0).getBlockID()
+            .equals(locationInfos.get(0).getBlockID()));
+    Assert.assertFalse(
+        groupOutputStream.getLocationInfoList().get(1).getBlockID()
+            .equals(locationInfos.get(1).getBlockID()));
+    key.close();
+  }
+
   private OzoneOutputStream createKey(String keyName, ReplicationType type,
       long size) throws Exception {
     ReplicationFactor factor =
@@ -344,20 +380,6 @@ public class TestCloseContainerHandlingByClient {
         .createKey(keyName, size, type, factor);
   }
 
-  private List<OmKeyLocationInfo> getLocationInfos(
-      ChunkGroupOutputStream groupOutputStream, String keyName)
-      throws IOException {
-    long clientId = groupOutputStream.getOpenID();
-    OMMetadataManager metadataManager =
-        cluster.getOzoneManager().getMetadataManager();
-    byte[] openKey = metadataManager
-        .getOpenKeyBytes(volumeName, bucketName, keyName, clientId);
-    byte[] openKeyData = metadataManager.getOpenKeyTable().get(openKey);
-    OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(
-        OzoneManagerProtocolProtos.KeyInfo.parseFrom(openKeyData));
-    return keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
-  }
-
   private void validateData(String keyName, byte[] data) throws Exception {
     byte[] readData = new byte[data.length];
     OzoneInputStream is =
@@ -427,7 +449,7 @@ public class TestCloseContainerHandlingByClient {
     String dataString = fixedLengthString(keyString, (3 * chunkSize));
     key.write(dataString.getBytes());
     List<OmKeyLocationInfo> locationInfos =
-        getLocationInfos(groupOutputStream, keyName);
+        groupOutputStream.getLocationInfoList();
     long containerID = locationInfos.get(0).getContainerID();
     List<DatanodeDetails> datanodes =
         cluster.getStorageContainerManager().getScmContainerManager()


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org