You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2018/08/31 16:10:42 UTC

[16/47] hadoop git commit: HDDS-247. Handle CLOSED_CONTAINER_IO exception in ozoneClient. Contributed by Shashikant Banerjee.

HDDS-247. Handle CLOSED_CONTAINER_IO exception in ozoneClient. Contributed by Shashikant Banerjee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3974427f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3974427f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3974427f

Branch: refs/heads/HDFS-12943
Commit: 3974427f67299496e13b04f0d006d367b705fcb5
Parents: 26c2a97
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Tue Aug 28 07:11:36 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Tue Aug 28 07:12:07 2018 +0530

----------------------------------------------------------------------
 .../hdds/scm/storage/ChunkOutputStream.java     |  28 +-
 .../ozone/client/io/ChunkGroupOutputStream.java | 195 +++++++--
 .../hadoop/ozone/om/helpers/OmKeyInfo.java      |  23 +-
 .../rpc/TestCloseContainerHandlingByClient.java | 408 +++++++++++++++++++
 .../ozone/container/ContainerTestHelper.java    |  21 +
 .../hadoop/ozone/om/TestOmBlockVersioning.java  |  16 +-
 6 files changed, 630 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3974427f/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
index 779e636..7309434 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
@@ -94,6 +94,10 @@ public class ChunkOutputStream extends OutputStream {
     this.chunkIndex = 0;
   }
 
+  public ByteBuffer getBuffer() {
+    return buffer;
+  }
+
   @Override
   public synchronized void write(int b) throws IOException {
     checkOpen();
@@ -106,7 +110,8 @@ public class ChunkOutputStream extends OutputStream {
   }
 
   @Override
-  public void write(byte[] b, int off, int len) throws IOException {
+  public synchronized void write(byte[] b, int off, int len)
+      throws IOException {
     if (b == null) {
       throw new NullPointerException();
     }
@@ -143,24 +148,27 @@ public class ChunkOutputStream extends OutputStream {
 
   @Override
   public synchronized void close() throws IOException {
-    if (xceiverClientManager != null && xceiverClient != null &&
-        buffer != null) {
+    if (xceiverClientManager != null && xceiverClient != null
+        && buffer != null) {
+      if (buffer.position() > 0) {
+        writeChunkToContainer();
+      }
       try {
-        if (buffer.position() > 0) {
-          writeChunkToContainer();
-        }
         putKey(xceiverClient, containerKeyData.build(), traceID);
       } catch (IOException e) {
         throw new IOException(
             "Unexpected Storage Container Exception: " + e.toString(), e);
       } finally {
-        xceiverClientManager.releaseClient(xceiverClient);
-        xceiverClientManager = null;
-        xceiverClient = null;
-        buffer = null;
+        cleanup();
       }
     }
+  }
 
+  public synchronized void cleanup() {
+    xceiverClientManager.releaseClient(xceiverClient);
+    xceiverClientManager = null;
+    xceiverClient = null;
+    buffer = null;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3974427f/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
index 83b4dfd..988af07 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -46,8 +47,10 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 /**
  * Maintaining a list of ChunkInputStream. Write based on offset.
@@ -111,6 +114,11 @@ public class ChunkGroupOutputStream extends OutputStream {
     return streamEntries;
   }
 
+  @VisibleForTesting
+  public int getOpenID() {
+    return openID;
+  }
+
   public ChunkGroupOutputStream(
       OpenKeySession handler, XceiverClientManager xceiverClientManager,
       StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
@@ -220,26 +228,9 @@ public class ChunkGroupOutputStream extends OutputStream {
 
   @Override
   public synchronized void write(int b) throws IOException {
-    checkNotClosed();
-
-    if (streamEntries.size() <= currentStreamIndex) {
-      Preconditions.checkNotNull(omClient);
-      // allocate a new block, if a exception happens, log an error and
-      // throw exception to the caller directly, and the write fails.
-      try {
-        allocateNewBlock(currentStreamIndex);
-      } catch (IOException ioe) {
-        LOG.error("Allocate block fail when writing.");
-        throw ioe;
-      }
-    }
-    ChunkOutputStreamEntry entry = streamEntries.get(currentStreamIndex);
-    entry.write(b);
-    incrementBlockLength(currentStreamIndex, 1);
-    if (entry.getRemaining() <= 0) {
-      currentStreamIndex += 1;
-    }
-    byteOffset += 1;
+    byte[] buf = new byte[1];
+    buf[0] = (byte) b;
+    write(buf, 0, 1);
   }
 
   /**
@@ -258,7 +249,10 @@ public class ChunkGroupOutputStream extends OutputStream {
   public synchronized void write(byte[] b, int off, int len)
       throws IOException {
     checkNotClosed();
+    handleWrite(b, off, len);
+  }
 
+  private void handleWrite(byte[] b, int off, int len) throws IOException {
     if (b == null) {
       throw new NullPointerException();
     }
@@ -288,10 +282,21 @@ public class ChunkGroupOutputStream extends OutputStream {
       // still do a sanity check.
       Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
       ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex);
-      int writeLen = Math.min(len, (int)current.getRemaining());
-      current.write(b, off, writeLen);
+      int writeLen = Math.min(len, (int) current.getRemaining());
+      try {
+        current.write(b, off, writeLen);
+      } catch (IOException ioe) {
+        if (checkIfContainerIsClosed(ioe)) {
+          handleCloseContainerException(current, currentStreamIndex);
+          continue;
+        } else {
+          throw ioe;
+        }
+      }
       incrementBlockLength(currentStreamIndex, writeLen);
       if (current.getRemaining() <= 0) {
+        // since the current block is already written close the stream.
+        handleFlushOrClose(true);
         currentStreamIndex += 1;
       }
       len -= writeLen;
@@ -301,6 +306,90 @@ public class ChunkGroupOutputStream extends OutputStream {
   }
 
   /**
+   * It performs following actions :
+   * a. Updates the committed length at datanode for the current stream in
+   *    datanode.
+   * b. Reads the data from the underlying buffer and writes it the next stream.
+   *
+   * @param streamEntry StreamEntry
+   * @param streamIndex Index of the entry
+   * @throws IOException Throws IOexception if Write fails
+   */
+  private void handleCloseContainerException(ChunkOutputStreamEntry streamEntry,
+      int streamIndex) throws IOException {
+    // TODO : If the block is still not committed and is in the
+    // pending openBlock Map, it will return BLOCK_NOT_COMMITTED
+    // exception. We should handle this by retrying the same operation
+    // n times and update the OzoneManager with the actual block length
+    // written. At this point of time, we also need to allocate new blocks
+    // from a different container and may need to nullify
+    // all the remaining pre-allocated blocks in case they were
+    // pre-allocated on the same container which got closed now.This needs
+    // caching the closed container list on the client itself.
+    long committedLength = 0;
+    ByteBuffer buffer = streamEntry.getBuffer();
+    if (buffer == null) {
+      // the buffer here will be null only when closeContainerException is
+      // hit while calling putKey during close on chunkOutputStream.
+      // Since closeContainer auto commit pending keys, no need to do
+      // anything here.
+      return;
+    }
+
+    // In case where not a single chunk of data has been written to the Datanode
+    // yet. This block does not yet exist on the datanode but cached on the
+    // outputStream buffer. No need to call GetCommittedBlockLength here
+    // for this block associated with the stream here.
+    if (streamEntry.currentPosition >= chunkSize
+        || streamEntry.currentPosition != buffer.position()) {
+      ContainerProtos.GetCommittedBlockLengthResponseProto responseProto =
+          ContainerProtocolCalls
+              .getCommittedBlockLength(streamEntry.xceiverClient,
+                  streamEntry.blockID, requestID);
+      committedLength = responseProto.getBlockLength();
+      // update the length of the current stream
+      locationInfoList.get(streamIndex).setLength(committedLength);
+    }
+
+    if (buffer.position() > 0) {
+      // If the data is still cached in the underlying stream, we need to
+      // allocate new block and write this data in the datanode. The cached
+      // data in the buffer does not exceed chunkSize.
+      Preconditions.checkState(buffer.position() < chunkSize);
+      currentStreamIndex += 1;
+      // readjust the byteOffset value to the length actually been written.
+      byteOffset -= buffer.position();
+      handleWrite(buffer.array(), 0, buffer.position());
+    }
+
+    // just clean up the current stream. Since the container is already closed,
+    // it will be auto committed. No need to call close again here.
+    streamEntry.cleanup();
+    // This case will arise when while writing the first chunk itself fails.
+    // In such case, the current block associated with the stream has no data
+    // written. Remove it from the current stream list.
+    if (committedLength == 0) {
+      streamEntries.remove(streamIndex);
+      locationInfoList.remove(streamIndex);
+      Preconditions.checkArgument(currentStreamIndex != 0);
+      currentStreamIndex -= 1;
+    }
+  }
+
+  private boolean checkIfContainerIsClosed(IOException ioe) {
+    return Optional.of(ioe.getCause())
+        .filter(e -> e instanceof StorageContainerException)
+        .map(e -> (StorageContainerException) e)
+        .filter(sce -> sce.getResult() == Result.CLOSED_CONTAINER_IO)
+        .isPresent();
+  }
+
+  private long getKeyLength() {
+    return locationInfoList.parallelStream().mapToLong(e -> e.getLength())
+        .sum();
+  }
+
+  /**
    * Contact OM to get a new block. Set the new block with the index (e.g.
    * first block has index = 0, second has index = 1 etc.)
    *
@@ -317,11 +406,41 @@ public class ChunkGroupOutputStream extends OutputStream {
   @Override
   public synchronized void flush() throws IOException {
     checkNotClosed();
+    handleFlushOrClose(false);
+  }
+
+  /**
+   * Close or Flush the latest outputStream.
+   * @param close Flag which decides whether to call close or flush on the
+   *              outputStream.
+   * @throws IOException In case, flush or close fails with exception.
+   */
+  private void handleFlushOrClose(boolean close) throws IOException {
     if (streamEntries.size() == 0) {
       return;
     }
-    for (int i = 0; i <= currentStreamIndex; i++) {
-      streamEntries.get(i).flush();
+    int size = streamEntries.size();
+    int streamIndex =
+        currentStreamIndex >= size ? size - 1 : currentStreamIndex;
+    ChunkOutputStreamEntry entry = streamEntries.get(streamIndex);
+    if (entry != null) {
+      try {
+        if (close) {
+          entry.close();
+        } else {
+          entry.flush();
+        }
+      } catch (IOException ioe) {
+        if (checkIfContainerIsClosed(ioe)) {
+          // This call will allocate a new streamEntry and write the Data.
+          // Close needs to be retried on the newly allocated streamEntry as
+          // as well.
+          handleCloseContainerException(entry, streamIndex);
+          handleFlushOrClose(close);
+        } else {
+          throw ioe;
+        }
+      }
     }
   }
 
@@ -336,16 +455,11 @@ public class ChunkGroupOutputStream extends OutputStream {
       return;
     }
     closed = true;
-    for (ChunkOutputStreamEntry entry : streamEntries) {
-      if (entry != null) {
-        entry.close();
-      }
-    }
+    handleFlushOrClose(true);
     if (keyArgs != null) {
       // in test, this could be null
-      long length =
-          locationInfoList.parallelStream().mapToLong(e -> e.getLength()).sum();
-      Preconditions.checkState(byteOffset == length);
+      Preconditions.checkState(streamEntries.size() == locationInfoList.size());
+      Preconditions.checkState(byteOffset == getKeyLength());
       keyArgs.setDataSize(byteOffset);
       keyArgs.setLocationInfoList(locationInfoList);
       omClient.commitKey(keyArgs, openID);
@@ -506,6 +620,23 @@ public class ChunkGroupOutputStream extends OutputStream {
         this.outputStream.close();
       }
     }
+
+    ByteBuffer getBuffer() throws IOException {
+      if (this.outputStream instanceof ChunkOutputStream) {
+        ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
+        return out.getBuffer();
+      }
+      throw new IOException("Invalid Output Stream for Key: " + key);
+    }
+
+    public void cleanup() {
+      checkStream();
+      if (this.outputStream instanceof ChunkOutputStream) {
+        ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
+        out.cleanup();
+      }
+    }
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3974427f/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index 3603964..f6e4265 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -125,19 +125,16 @@ public final class OmKeyInfo {
     OmKeyLocationInfoGroup keyLocationInfoGroup = getLatestVersionLocations();
     List<OmKeyLocationInfo> currentList =
         keyLocationInfoGroup.getLocationList();
-    Preconditions.checkNotNull(keyLocationInfoGroup);
-    Preconditions.checkState(locationInfoList.size() <= currentList.size());
-    for (OmKeyLocationInfo current : currentList) {
-      // For Versioning, while committing the key for the newer version,
-      // we just need to update the lengths for new blocks. Need to iterate over
-      // and find the new blocks added in the latest version.
-      for (OmKeyLocationInfo info : locationInfoList) {
-        if (info.getBlockID().equals(current.getBlockID())) {
-          current.setLength(info.getLength());
-          break;
-        }
-      }
-    }
+    List<OmKeyLocationInfo> latestVersionList =
+        keyLocationInfoGroup.getBlocksLatestVersionOnly();
+    // Updates the latest locationList in the latest version only with
+    // given locationInfoList here.
+    // TODO : The original allocated list and the updated list here may vary
+    // as the containers on the Datanode on which the blocks were pre allocated
+    // might get closed. The diff of blocks between these two lists here
+    // need to be garbage collected in case the ozone client dies.
+    currentList.removeAll(latestVersionList);
+    currentList.addAll(locationInfoList);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3974427f/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
new file mode 100644
index 0000000..e5ecd81
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Tests Close Container Exception handling by Ozone Client.
+ */
+public class TestCloseContainerHandlingByClient {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+  private static OzoneClient client;
+  private static ObjectStore objectStore;
+  private static int chunkSize;
+  private static int blockSize;
+  private static String volumeName;
+  private static String bucketName;
+  private static String keyString;
+
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   * Ozone is made active by setting OZONE_ENABLED = true and
+   * OZONE_HANDLER_TYPE_KEY = "distributed"
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
+        OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
+    chunkSize = (int)OzoneConsts.MB;
+    blockSize = 4 * chunkSize;
+    conf.setInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, chunkSize);
+    conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, (4));
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(3).build();
+    cluster.waitForClusterToBeReady();
+    //the easiest way to create an open container is creating a key
+    client = OzoneClientFactory.getClient(conf);
+    objectStore = client.getObjectStore();
+    keyString = UUID.randomUUID().toString();
+    volumeName = "closecontainerexceptionhandlingtest";
+    bucketName = volumeName;
+    objectStore.createVolume(volumeName);
+    objectStore.getVolume(volumeName).createBucket(bucketName);
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  private static String fixedLengthString(String string, int length) {
+    return String.format("%1$"+length+ "s", string);
+  }
+
+  @Test
+  public void testBlockWritesWithFlushAndClose() throws Exception {
+    String keyName = "standalone";
+    OzoneOutputStream key =
+        createKey(keyName, ReplicationType.STAND_ALONE, 0);
+    // write data more than 1 chunk
+    byte[] data =
+        fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
+    key.write(data);
+
+    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+    //get the name of a valid container
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setType(HddsProtos.ReplicationType.STAND_ALONE)
+        .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
+        .build();
+
+    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE);
+    key.write(data);
+    key.flush();
+    key.close();
+    // read the key from OM again and match the length.The length will still
+    // be the equal to the original data size.
+    OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+    List<OmKeyLocationInfo> keyLocationInfos =
+        keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
+    //we have written two blocks
+    Assert.assertEquals(2, keyLocationInfos.size());
+    OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0);
+    Assert.assertEquals(data.length - (data.length % chunkSize),
+        omKeyLocationInfo.getLength());
+    Assert.assertEquals(data.length + (data.length % chunkSize),
+        keyLocationInfos.get(1).getLength());
+    Assert.assertEquals(2 * data.length, keyInfo.getDataSize());
+
+    // Written the same data twice
+    String dataString = new String(data);
+    dataString.concat(dataString);
+    validateData(keyName, dataString.getBytes());
+  }
+
+  @Test
+  public void testBlockWritesCloseConsistency() throws Exception {
+    String keyName = "standalone2";
+    OzoneOutputStream key = createKey(keyName, ReplicationType.STAND_ALONE, 0);
+    // write data more than 1 chunk
+    byte[] data =
+        fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
+    key.write(data);
+
+    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+    //get the name of a valid container
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setType(HddsProtos.ReplicationType.STAND_ALONE)
+        .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
+        .build();
+
+    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE);
+    key.close();
+    // read the key from OM again and match the length.The length will still
+    // be the equal to the original data size.
+    OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+    List<OmKeyLocationInfo> keyLocationInfos =
+        keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
+    // Though we have written only block initially, the close will hit
+    // closeContainerException and remaining data in the chunkOutputStream
+    // buffer will be copied into a different allocated block and will be
+    // committed.
+    Assert.assertEquals(2, keyLocationInfos.size());
+    OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0);
+    Assert.assertEquals(data.length - (data.length % chunkSize),
+        omKeyLocationInfo.getLength());
+    Assert.assertEquals(data.length % chunkSize,
+        keyLocationInfos.get(1).getLength());
+    Assert.assertEquals(data.length, keyInfo.getDataSize());
+    validateData(keyName, data);
+  }
+
+  @Test
+  public void testMultiBlockWrites() throws Exception {
+
+    String keyName = "standalone3";
+    OzoneOutputStream key =
+        createKey(keyName, ReplicationType.STAND_ALONE, (4 * blockSize));
+    ChunkGroupOutputStream groupOutputStream =
+        (ChunkGroupOutputStream) key.getOutputStream();
+    // With the initial size provided, it should have preallocated 3 blocks
+    Assert.assertEquals(4, groupOutputStream.getStreamEntries().size());
+    // write data more than 1 chunk
+    byte[] data = fixedLengthString(keyString, (3 * blockSize)).getBytes();
+    Assert.assertEquals(data.length, 3 * blockSize);
+    key.write(data);
+
+    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+    //get the name of a valid container
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setType(HddsProtos.ReplicationType.STAND_ALONE)
+        .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
+        .build();
+
+    waitForContainerClose(keyName, key,
+        HddsProtos.ReplicationType.STAND_ALONE);
+    // write 1 more block worth of data. It will fail and new block will be
+    // allocated
+    key.write(fixedLengthString(keyString, blockSize).getBytes());
+
+    key.close();
+    // read the key from OM again and match the length.The length will still
+    // be the equal to the original data size.
+    OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+    List<OmKeyLocationInfo> keyLocationInfos =
+        keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
+    // Though we have written only block initially, the close will hit
+    // closeContainerException and remaining data in the chunkOutputStream
+    // buffer will be copied into a different allocated block and will be
+    // committed.
+    Assert.assertEquals(4, keyLocationInfos.size());
+    Assert.assertEquals(4 * blockSize, keyInfo.getDataSize());
+    for (OmKeyLocationInfo locationInfo : keyLocationInfos) {
+      Assert.assertEquals(blockSize, locationInfo.getLength());
+    }
+  }
+
+  @Test
+  public void testMultiBlockWrites2() throws Exception {
+
+    String keyName = "standalone4";
+    long dataLength = 0;
+    OzoneOutputStream key =
+        createKey(keyName, ReplicationType.STAND_ALONE, 4 * blockSize);
+    ChunkGroupOutputStream groupOutputStream =
+        (ChunkGroupOutputStream) key.getOutputStream();
+
+    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+    // With the initial size provided, it should have pre allocated 4 blocks
+    Assert.assertEquals(4, groupOutputStream.getStreamEntries().size());
+    String dataString = fixedLengthString(keyString, (3 * blockSize));
+    byte[] data = dataString.getBytes();
+    key.write(data);
+    // 3 block are completely written to the DataNode in 3 blocks.
+    // Data of length half of chunkSize resides in the chunkOutput stream buffer
+    String dataString2 = fixedLengthString(keyString, chunkSize * 1 / 2);
+    key.write(dataString2.getBytes());
+    //get the name of a valid container
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setType(HddsProtos.ReplicationType.STAND_ALONE)
+        .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
+        .build();
+
+    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE);
+
+    key.close();
+    // read the key from OM again and match the length.The length will still
+    // be the equal to the original data size.
+    OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+    List<OmKeyLocationInfo> keyLocationInfos =
+        keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
+    // Though we have written only block initially, the close will hit
+    // closeContainerException and remaining data in the chunkOutputStream
+    // buffer will be copied into a different allocated block and will be
+    // committed.
+    Assert.assertEquals(4, keyLocationInfos.size());
+    dataLength = 3 * blockSize + (long) (0.5 * chunkSize);
+    Assert.assertEquals(dataLength, keyInfo.getDataSize());
+    validateData(keyName, dataString.concat(dataString2).getBytes());
+  }
+
+  private void waitForContainerClose(String keyName,
+      OzoneOutputStream outputStream, HddsProtos.ReplicationType type)
+      throws Exception {
+
+    ChunkGroupOutputStream groupOutputStream =
+        (ChunkGroupOutputStream) outputStream.getOutputStream();
+    int clientId = groupOutputStream.getOpenID();
+    OMMetadataManager metadataManager =
+        cluster.getOzoneManager().getMetadataManager();
+    String objectKey =
+        metadataManager.getKeyWithDBPrefix(volumeName, bucketName, keyName);
+    byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, clientId);
+    byte[] openKeyData = metadataManager.get(openKey);
+    OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(
+        OzoneManagerProtocolProtos.KeyInfo.parseFrom(openKeyData));
+    List<OmKeyLocationInfo> locationInfoList =
+        keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
+    List<Long> containerIdList = new ArrayList<>();
+    List<Pipeline> pipelineList = new ArrayList<>();
+    for (OmKeyLocationInfo info : locationInfoList) {
+      containerIdList.add(info.getContainerID());
+    }
+    Assert.assertTrue(!containerIdList.isEmpty());
+    for (long containerID : containerIdList) {
+      Pipeline pipeline =
+          cluster.getStorageContainerManager().getScmContainerManager()
+              .getContainerWithPipeline(containerID).getPipeline();
+      pipelineList.add(pipeline);
+      List<DatanodeDetails> datanodes = pipeline.getMachines();
+      for (DatanodeDetails details : datanodes) {
+        Assert.assertFalse(ContainerTestHelper
+            .isContainerClosed(cluster, containerID, details));
+        // send the order to close the container
+        cluster.getStorageContainerManager().getScmNodeManager()
+            .addDatanodeCommand(details.getUuid(),
+                new CloseContainerCommand(containerID, type, pipeline.getId()));
+      }
+    }
+
+    int index = 0;
+    for (long containerID : containerIdList) {
+      Pipeline pipeline = pipelineList.get(index);
+      List<DatanodeDetails> datanodes = pipeline.getMachines();
+      for (DatanodeDetails datanodeDetails : datanodes) {
+        GenericTestUtils.waitFor(() -> ContainerTestHelper
+                .isContainerClosed(cluster, containerID, datanodeDetails), 500,
+            15 * 1000);
+        //double check if it's really closed (waitFor also throws an exception)
+        Assert.assertTrue(ContainerTestHelper
+            .isContainerClosed(cluster, containerID, datanodeDetails));
+      }
+      index++;
+    }
+
+  }
+
+  private OzoneOutputStream createKey(String keyName, ReplicationType type,
+      long size) throws Exception {
+    ReplicationFactor factor =
+        type == ReplicationType.STAND_ALONE ? ReplicationFactor.ONE :
+            ReplicationFactor.THREE;
+    return objectStore.getVolume(volumeName).getBucket(bucketName)
+        .createKey(keyName, size, type, factor);
+  }
+
+  private void validateData(String keyName, byte[] data) throws Exception {
+    byte[] readData = new byte[data.length];
+    OzoneInputStream is =
+        objectStore.getVolume(volumeName).getBucket(bucketName)
+            .readKey(keyName);
+    is.read(readData);
+    MessageDigest sha1 = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+    sha1.update(data);
+    MessageDigest sha2 = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+    sha2.update(readData);
+    Assert.assertTrue(Arrays.equals(sha1.digest(), sha2.digest()));
+    is.close();
+  }
+
+
+  @Test
+  public void testBlockWriteViaRatis() throws Exception {
+    String keyName = "ratis";
+    OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+    byte[] data =
+        fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
+    key.write(data);
+
+    //get the name of a valid container
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
+        setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.THREE)
+        .setKeyName(keyName).build();
+
+    Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+    waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
+    // Again Write the Data. This will throw an exception which will be handled
+    // and new blocks will be allocated
+    key.write(data);
+    key.flush();
+    // The write will fail but exception will be handled and length will be
+    // updated correctly in OzoneManager once the steam is closed
+    key.close();
+    // read the key from OM again and match the length.The length will still
+    // be the equal to the original data size.
+    OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+    List<OmKeyLocationInfo> keyLocationInfos =
+        keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
+    //we have written two blocks
+    Assert.assertEquals(2, keyLocationInfos.size());
+    OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0);
+    Assert.assertEquals(data.length - (data.length % chunkSize),
+        omKeyLocationInfo.getLength());
+    Assert.assertEquals(data.length + (data.length % chunkSize),
+        keyLocationInfos.get(1).getLength());
+    Assert.assertEquals(2 * data.length, keyInfo.getDataSize());
+    String dataString = new String(data);
+    dataString.concat(dataString);
+    validateData(keyName, dataString.getBytes());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3974427f/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index ca92110..dc166b5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -21,6 +21,10 @@ package org.apache.hadoop.ozone.container;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -604,4 +608,21 @@ public final class ContainerTestHelper {
   public static long getTestContainerID() {
     return Time.getUtcTime();
   }
+
+  public static boolean isContainerClosed(MiniOzoneCluster cluster,
+      long containerID, DatanodeDetails datanode) {
+    ContainerData containerData;
+    for (HddsDatanodeService datanodeService : cluster.getHddsDatanodes()) {
+      if (datanode.equals(datanodeService.getDatanodeDetails())) {
+        Container container =
+            datanodeService.getDatanodeStateMachine().getContainer()
+                .getContainerSet().getContainer(containerID);
+        if (container != null) {
+          containerData = container.getContainerData();
+          return containerData.isClosed();
+        }
+      }
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3974427f/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
index f5dddee..0eb1677 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
@@ -39,12 +39,12 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.Assert;
 import org.junit.rules.ExpectedException;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -124,8 +124,8 @@ public class TestOmBlockVersioning {
     // 1st update, version 0
     OpenKeySession openKey = ozoneManager.openKey(keyArgs);
     // explicitly set the keyLocation list before committing the key.
-    keyArgs.setLocationInfoList(
-        openKey.getKeyInfo().getLatestVersionLocations().getLocationList());
+    keyArgs.setLocationInfoList(openKey.getKeyInfo().getLatestVersionLocations()
+        .getBlocksLatestVersionOnly());
     ozoneManager.commitKey(keyArgs, openKey.getId());
 
     OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs);
@@ -139,8 +139,8 @@ public class TestOmBlockVersioning {
     //OmKeyLocationInfo locationInfo =
     //    ozoneManager.allocateBlock(keyArgs, openKey.getId());
     // explicitly set the keyLocation list before committing the key.
-    keyArgs.setLocationInfoList(
-        openKey.getKeyInfo().getLatestVersionLocations().getLocationList());
+    keyArgs.setLocationInfoList(openKey.getKeyInfo().getLatestVersionLocations()
+        .getBlocksLatestVersionOnly());
     ozoneManager.commitKey(keyArgs, openKey.getId());
 
     keyInfo = ozoneManager.lookupKey(keyArgs);
@@ -150,10 +150,14 @@ public class TestOmBlockVersioning {
 
     // 3rd update, version 2
     openKey = ozoneManager.openKey(keyArgs);
+
     // this block will be appended to the latest version of version 2.
     OmKeyLocationInfo locationInfo =
         ozoneManager.allocateBlock(keyArgs, openKey.getId());
-    List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
+    List<OmKeyLocationInfo> locationInfoList =
+        openKey.getKeyInfo().getLatestVersionLocations()
+            .getBlocksLatestVersionOnly();
+    Assert.assertTrue(locationInfoList.size() == 1);
     locationInfoList.add(locationInfo);
     keyArgs.setLocationInfoList(locationInfoList);
     ozoneManager.commitKey(keyArgs, openKey.getId());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org