You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2018/08/31 16:10:42 UTC
[16/47] hadoop git commit: HDDS-247. Handle CLOSED_CONTAINER_IO
exception in ozoneClient. Contributed by Shashikant Banerjee.
HDDS-247. Handle CLOSED_CONTAINER_IO exception in ozoneClient. Contributed by Shashikant Banerjee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3974427f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3974427f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3974427f
Branch: refs/heads/HDFS-12943
Commit: 3974427f67299496e13b04f0d006d367b705fcb5
Parents: 26c2a97
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Tue Aug 28 07:11:36 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Tue Aug 28 07:12:07 2018 +0530
----------------------------------------------------------------------
.../hdds/scm/storage/ChunkOutputStream.java | 28 +-
.../ozone/client/io/ChunkGroupOutputStream.java | 195 +++++++--
.../hadoop/ozone/om/helpers/OmKeyInfo.java | 23 +-
.../rpc/TestCloseContainerHandlingByClient.java | 408 +++++++++++++++++++
.../ozone/container/ContainerTestHelper.java | 21 +
.../hadoop/ozone/om/TestOmBlockVersioning.java | 16 +-
6 files changed, 630 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3974427f/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
index 779e636..7309434 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java
@@ -94,6 +94,10 @@ public class ChunkOutputStream extends OutputStream {
this.chunkIndex = 0;
}
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+
@Override
public synchronized void write(int b) throws IOException {
checkOpen();
@@ -106,7 +110,8 @@ public class ChunkOutputStream extends OutputStream {
}
@Override
- public void write(byte[] b, int off, int len) throws IOException {
+ public synchronized void write(byte[] b, int off, int len)
+ throws IOException {
if (b == null) {
throw new NullPointerException();
}
@@ -143,24 +148,27 @@ public class ChunkOutputStream extends OutputStream {
@Override
public synchronized void close() throws IOException {
- if (xceiverClientManager != null && xceiverClient != null &&
- buffer != null) {
+ if (xceiverClientManager != null && xceiverClient != null
+ && buffer != null) {
+ if (buffer.position() > 0) {
+ writeChunkToContainer();
+ }
try {
- if (buffer.position() > 0) {
- writeChunkToContainer();
- }
putKey(xceiverClient, containerKeyData.build(), traceID);
} catch (IOException e) {
throw new IOException(
"Unexpected Storage Container Exception: " + e.toString(), e);
} finally {
- xceiverClientManager.releaseClient(xceiverClient);
- xceiverClientManager = null;
- xceiverClient = null;
- buffer = null;
+ cleanup();
}
}
+ }
+ public synchronized void cleanup() {
+ xceiverClientManager.releaseClient(xceiverClient);
+ xceiverClientManager = null;
+ xceiverClient = null;
+ buffer = null;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3974427f/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
index 83b4dfd..988af07 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -46,8 +47,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
/**
* Maintaining a list of ChunkInputStream. Write based on offset.
@@ -111,6 +114,11 @@ public class ChunkGroupOutputStream extends OutputStream {
return streamEntries;
}
+ @VisibleForTesting
+ public int getOpenID() {
+ return openID;
+ }
+
public ChunkGroupOutputStream(
OpenKeySession handler, XceiverClientManager xceiverClientManager,
StorageContainerLocationProtocolClientSideTranslatorPB scmClient,
@@ -220,26 +228,9 @@ public class ChunkGroupOutputStream extends OutputStream {
@Override
public synchronized void write(int b) throws IOException {
- checkNotClosed();
-
- if (streamEntries.size() <= currentStreamIndex) {
- Preconditions.checkNotNull(omClient);
- // allocate a new block, if a exception happens, log an error and
- // throw exception to the caller directly, and the write fails.
- try {
- allocateNewBlock(currentStreamIndex);
- } catch (IOException ioe) {
- LOG.error("Allocate block fail when writing.");
- throw ioe;
- }
- }
- ChunkOutputStreamEntry entry = streamEntries.get(currentStreamIndex);
- entry.write(b);
- incrementBlockLength(currentStreamIndex, 1);
- if (entry.getRemaining() <= 0) {
- currentStreamIndex += 1;
- }
- byteOffset += 1;
+ byte[] buf = new byte[1];
+ buf[0] = (byte) b;
+ write(buf, 0, 1);
}
/**
@@ -258,7 +249,10 @@ public class ChunkGroupOutputStream extends OutputStream {
public synchronized void write(byte[] b, int off, int len)
throws IOException {
checkNotClosed();
+ handleWrite(b, off, len);
+ }
+ private void handleWrite(byte[] b, int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
}
@@ -288,10 +282,21 @@ public class ChunkGroupOutputStream extends OutputStream {
// still do a sanity check.
Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex);
- int writeLen = Math.min(len, (int)current.getRemaining());
- current.write(b, off, writeLen);
+ int writeLen = Math.min(len, (int) current.getRemaining());
+ try {
+ current.write(b, off, writeLen);
+ } catch (IOException ioe) {
+ if (checkIfContainerIsClosed(ioe)) {
+ handleCloseContainerException(current, currentStreamIndex);
+ continue;
+ } else {
+ throw ioe;
+ }
+ }
incrementBlockLength(currentStreamIndex, writeLen);
if (current.getRemaining() <= 0) {
+ // since the current block is already written close the stream.
+ handleFlushOrClose(true);
currentStreamIndex += 1;
}
len -= writeLen;
@@ -301,6 +306,90 @@ public class ChunkGroupOutputStream extends OutputStream {
}
/**
+ * It performs following actions :
+ * a. Updates the committed length at datanode for the current stream in
+ * datanode.
+ * b. Reads the data from the underlying buffer and writes it the next stream.
+ *
+ * @param streamEntry StreamEntry
+ * @param streamIndex Index of the entry
+ * @throws IOException Throws IOexception if Write fails
+ */
+ private void handleCloseContainerException(ChunkOutputStreamEntry streamEntry,
+ int streamIndex) throws IOException {
+ // TODO : If the block is still not committed and is in the
+ // pending openBlock Map, it will return BLOCK_NOT_COMMITTED
+ // exception. We should handle this by retrying the same operation
+ // n times and update the OzoneManager with the actual block length
+ // written. At this point of time, we also need to allocate new blocks
+ // from a different container and may need to nullify
+ // all the remaining pre-allocated blocks in case they were
+ // pre-allocated on the same container which got closed now.This needs
+ // caching the closed container list on the client itself.
+ long committedLength = 0;
+ ByteBuffer buffer = streamEntry.getBuffer();
+ if (buffer == null) {
+ // the buffer here will be null only when closeContainerException is
+ // hit while calling putKey during close on chunkOutputStream.
+ // Since closeContainer auto commit pending keys, no need to do
+ // anything here.
+ return;
+ }
+
+ // In case where not a single chunk of data has been written to the Datanode
+ // yet. This block does not yet exist on the datanode but cached on the
+ // outputStream buffer. No need to call GetCommittedBlockLength here
+ // for this block associated with the stream here.
+ if (streamEntry.currentPosition >= chunkSize
+ || streamEntry.currentPosition != buffer.position()) {
+ ContainerProtos.GetCommittedBlockLengthResponseProto responseProto =
+ ContainerProtocolCalls
+ .getCommittedBlockLength(streamEntry.xceiverClient,
+ streamEntry.blockID, requestID);
+ committedLength = responseProto.getBlockLength();
+ // update the length of the current stream
+ locationInfoList.get(streamIndex).setLength(committedLength);
+ }
+
+ if (buffer.position() > 0) {
+ // If the data is still cached in the underlying stream, we need to
+ // allocate new block and write this data in the datanode. The cached
+ // data in the buffer does not exceed chunkSize.
+ Preconditions.checkState(buffer.position() < chunkSize);
+ currentStreamIndex += 1;
+ // readjust the byteOffset value to the length actually been written.
+ byteOffset -= buffer.position();
+ handleWrite(buffer.array(), 0, buffer.position());
+ }
+
+ // just clean up the current stream. Since the container is already closed,
+ // it will be auto committed. No need to call close again here.
+ streamEntry.cleanup();
+ // This case will arise when while writing the first chunk itself fails.
+ // In such case, the current block associated with the stream has no data
+ // written. Remove it from the current stream list.
+ if (committedLength == 0) {
+ streamEntries.remove(streamIndex);
+ locationInfoList.remove(streamIndex);
+ Preconditions.checkArgument(currentStreamIndex != 0);
+ currentStreamIndex -= 1;
+ }
+ }
+
+ private boolean checkIfContainerIsClosed(IOException ioe) {
+ return Optional.of(ioe.getCause())
+ .filter(e -> e instanceof StorageContainerException)
+ .map(e -> (StorageContainerException) e)
+ .filter(sce -> sce.getResult() == Result.CLOSED_CONTAINER_IO)
+ .isPresent();
+ }
+
+ private long getKeyLength() {
+ return locationInfoList.parallelStream().mapToLong(e -> e.getLength())
+ .sum();
+ }
+
+ /**
* Contact OM to get a new block. Set the new block with the index (e.g.
* first block has index = 0, second has index = 1 etc.)
*
@@ -317,11 +406,41 @@ public class ChunkGroupOutputStream extends OutputStream {
@Override
public synchronized void flush() throws IOException {
checkNotClosed();
+ handleFlushOrClose(false);
+ }
+
+ /**
+ * Close or Flush the latest outputStream.
+ * @param close Flag which decides whether to call close or flush on the
+ * outputStream.
+ * @throws IOException In case, flush or close fails with exception.
+ */
+ private void handleFlushOrClose(boolean close) throws IOException {
if (streamEntries.size() == 0) {
return;
}
- for (int i = 0; i <= currentStreamIndex; i++) {
- streamEntries.get(i).flush();
+ int size = streamEntries.size();
+ int streamIndex =
+ currentStreamIndex >= size ? size - 1 : currentStreamIndex;
+ ChunkOutputStreamEntry entry = streamEntries.get(streamIndex);
+ if (entry != null) {
+ try {
+ if (close) {
+ entry.close();
+ } else {
+ entry.flush();
+ }
+ } catch (IOException ioe) {
+ if (checkIfContainerIsClosed(ioe)) {
+ // This call will allocate a new streamEntry and write the Data.
+ // Close needs to be retried on the newly allocated streamEntry as
+ // as well.
+ handleCloseContainerException(entry, streamIndex);
+ handleFlushOrClose(close);
+ } else {
+ throw ioe;
+ }
+ }
}
}
@@ -336,16 +455,11 @@ public class ChunkGroupOutputStream extends OutputStream {
return;
}
closed = true;
- for (ChunkOutputStreamEntry entry : streamEntries) {
- if (entry != null) {
- entry.close();
- }
- }
+ handleFlushOrClose(true);
if (keyArgs != null) {
// in test, this could be null
- long length =
- locationInfoList.parallelStream().mapToLong(e -> e.getLength()).sum();
- Preconditions.checkState(byteOffset == length);
+ Preconditions.checkState(streamEntries.size() == locationInfoList.size());
+ Preconditions.checkState(byteOffset == getKeyLength());
keyArgs.setDataSize(byteOffset);
keyArgs.setLocationInfoList(locationInfoList);
omClient.commitKey(keyArgs, openID);
@@ -506,6 +620,23 @@ public class ChunkGroupOutputStream extends OutputStream {
this.outputStream.close();
}
}
+
+ ByteBuffer getBuffer() throws IOException {
+ if (this.outputStream instanceof ChunkOutputStream) {
+ ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
+ return out.getBuffer();
+ }
+ throw new IOException("Invalid Output Stream for Key: " + key);
+ }
+
+ public void cleanup() {
+ checkStream();
+ if (this.outputStream instanceof ChunkOutputStream) {
+ ChunkOutputStream out = (ChunkOutputStream) this.outputStream;
+ out.cleanup();
+ }
+ }
+
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3974427f/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index 3603964..f6e4265 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -125,19 +125,16 @@ public final class OmKeyInfo {
OmKeyLocationInfoGroup keyLocationInfoGroup = getLatestVersionLocations();
List<OmKeyLocationInfo> currentList =
keyLocationInfoGroup.getLocationList();
- Preconditions.checkNotNull(keyLocationInfoGroup);
- Preconditions.checkState(locationInfoList.size() <= currentList.size());
- for (OmKeyLocationInfo current : currentList) {
- // For Versioning, while committing the key for the newer version,
- // we just need to update the lengths for new blocks. Need to iterate over
- // and find the new blocks added in the latest version.
- for (OmKeyLocationInfo info : locationInfoList) {
- if (info.getBlockID().equals(current.getBlockID())) {
- current.setLength(info.getLength());
- break;
- }
- }
- }
+ List<OmKeyLocationInfo> latestVersionList =
+ keyLocationInfoGroup.getBlocksLatestVersionOnly();
+ // Updates the latest locationList in the latest version only with
+ // given locationInfoList here.
+ // TODO : The original allocated list and the updated list here may vary
+ // as the containers on the Datanode on which the blocks were pre allocated
+ // might get closed. The diff of blocks between these two lists here
+ // need to be garbage collected in case the ozone client dies.
+ currentList.removeAll(latestVersionList);
+ currentList.addAll(locationInfoList);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3974427f/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
new file mode 100644
index 0000000..e5ecd81
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.ChunkGroupOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Tests Close Container Exception handling by Ozone Client.
+ */
+public class TestCloseContainerHandlingByClient {
+
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration conf;
+ private static OzoneClient client;
+ private static ObjectStore objectStore;
+ private static int chunkSize;
+ private static int blockSize;
+ private static String volumeName;
+ private static String bucketName;
+ private static String keyString;
+
+
+ /**
+ * Create a MiniDFSCluster for testing.
+ * <p>
+ * Ozone is made active by setting OZONE_ENABLED = true and
+ * OZONE_HANDLER_TYPE_KEY = "distributed"
+ *
+ * @throws IOException
+ */
+ @BeforeClass
+ public static void init() throws Exception {
+ conf = new OzoneConfiguration();
+ conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
+ OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
+ chunkSize = (int)OzoneConsts.MB;
+ blockSize = 4 * chunkSize;
+ conf.setInt(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY, chunkSize);
+ conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB, (4));
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(3).build();
+ cluster.waitForClusterToBeReady();
+ //the easiest way to create an open container is creating a key
+ client = OzoneClientFactory.getClient(conf);
+ objectStore = client.getObjectStore();
+ keyString = UUID.randomUUID().toString();
+ volumeName = "closecontainerexceptionhandlingtest";
+ bucketName = volumeName;
+ objectStore.createVolume(volumeName);
+ objectStore.getVolume(volumeName).createBucket(bucketName);
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @AfterClass
+ public static void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ private static String fixedLengthString(String string, int length) {
+ return String.format("%1$"+length+ "s", string);
+ }
+
+ @Test
+ public void testBlockWritesWithFlushAndClose() throws Exception {
+ String keyName = "standalone";
+ OzoneOutputStream key =
+ createKey(keyName, ReplicationType.STAND_ALONE, 0);
+ // write data more than 1 chunk
+ byte[] data =
+ fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
+ key.write(data);
+
+ Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+ //get the name of a valid container
+ OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setType(HddsProtos.ReplicationType.STAND_ALONE)
+ .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
+ .build();
+
+ waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE);
+ key.write(data);
+ key.flush();
+ key.close();
+ // read the key from OM again and match the length.The length will still
+ // be the equal to the original data size.
+ OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+ List<OmKeyLocationInfo> keyLocationInfos =
+ keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
+ //we have written two blocks
+ Assert.assertEquals(2, keyLocationInfos.size());
+ OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0);
+ Assert.assertEquals(data.length - (data.length % chunkSize),
+ omKeyLocationInfo.getLength());
+ Assert.assertEquals(data.length + (data.length % chunkSize),
+ keyLocationInfos.get(1).getLength());
+ Assert.assertEquals(2 * data.length, keyInfo.getDataSize());
+
+ // Written the same data twice
+ String dataString = new String(data);
+ dataString.concat(dataString);
+ validateData(keyName, dataString.getBytes());
+ }
+
+ @Test
+ public void testBlockWritesCloseConsistency() throws Exception {
+ String keyName = "standalone2";
+ OzoneOutputStream key = createKey(keyName, ReplicationType.STAND_ALONE, 0);
+ // write data more than 1 chunk
+ byte[] data =
+ fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
+ key.write(data);
+
+ Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+ //get the name of a valid container
+ OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setType(HddsProtos.ReplicationType.STAND_ALONE)
+ .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
+ .build();
+
+ waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE);
+ key.close();
+ // read the key from OM again and match the length.The length will still
+ // be the equal to the original data size.
+ OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+ List<OmKeyLocationInfo> keyLocationInfos =
+ keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
+ // Though we have written only block initially, the close will hit
+ // closeContainerException and remaining data in the chunkOutputStream
+ // buffer will be copied into a different allocated block and will be
+ // committed.
+ Assert.assertEquals(2, keyLocationInfos.size());
+ OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0);
+ Assert.assertEquals(data.length - (data.length % chunkSize),
+ omKeyLocationInfo.getLength());
+ Assert.assertEquals(data.length % chunkSize,
+ keyLocationInfos.get(1).getLength());
+ Assert.assertEquals(data.length, keyInfo.getDataSize());
+ validateData(keyName, data);
+ }
+
+ @Test
+ public void testMultiBlockWrites() throws Exception {
+
+ String keyName = "standalone3";
+ OzoneOutputStream key =
+ createKey(keyName, ReplicationType.STAND_ALONE, (4 * blockSize));
+ ChunkGroupOutputStream groupOutputStream =
+ (ChunkGroupOutputStream) key.getOutputStream();
+ // With the initial size provided, it should have preallocated 3 blocks
+ Assert.assertEquals(4, groupOutputStream.getStreamEntries().size());
+ // write data more than 1 chunk
+ byte[] data = fixedLengthString(keyString, (3 * blockSize)).getBytes();
+ Assert.assertEquals(data.length, 3 * blockSize);
+ key.write(data);
+
+ Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+ //get the name of a valid container
+ OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setType(HddsProtos.ReplicationType.STAND_ALONE)
+ .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
+ .build();
+
+ waitForContainerClose(keyName, key,
+ HddsProtos.ReplicationType.STAND_ALONE);
+ // write 1 more block worth of data. It will fail and new block will be
+ // allocated
+ key.write(fixedLengthString(keyString, blockSize).getBytes());
+
+ key.close();
+ // read the key from OM again and match the length.The length will still
+ // be the equal to the original data size.
+ OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+ List<OmKeyLocationInfo> keyLocationInfos =
+ keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
+ // Though we have written only block initially, the close will hit
+ // closeContainerException and remaining data in the chunkOutputStream
+ // buffer will be copied into a different allocated block and will be
+ // committed.
+ Assert.assertEquals(4, keyLocationInfos.size());
+ Assert.assertEquals(4 * blockSize, keyInfo.getDataSize());
+ for (OmKeyLocationInfo locationInfo : keyLocationInfos) {
+ Assert.assertEquals(blockSize, locationInfo.getLength());
+ }
+ }
+
+ @Test
+ public void testMultiBlockWrites2() throws Exception {
+
+ String keyName = "standalone4";
+ long dataLength = 0;
+ OzoneOutputStream key =
+ createKey(keyName, ReplicationType.STAND_ALONE, 4 * blockSize);
+ ChunkGroupOutputStream groupOutputStream =
+ (ChunkGroupOutputStream) key.getOutputStream();
+
+ Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+ // With the initial size provided, it should have pre allocated 4 blocks
+ Assert.assertEquals(4, groupOutputStream.getStreamEntries().size());
+ String dataString = fixedLengthString(keyString, (3 * blockSize));
+ byte[] data = dataString.getBytes();
+ key.write(data);
+ // 3 block are completely written to the DataNode in 3 blocks.
+ // Data of length half of chunkSize resides in the chunkOutput stream buffer
+ String dataString2 = fixedLengthString(keyString, chunkSize * 1 / 2);
+ key.write(dataString2.getBytes());
+ //get the name of a valid container
+ OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setType(HddsProtos.ReplicationType.STAND_ALONE)
+ .setFactor(HddsProtos.ReplicationFactor.ONE).setKeyName(keyName)
+ .build();
+
+ waitForContainerClose(keyName, key, HddsProtos.ReplicationType.STAND_ALONE);
+
+ key.close();
+ // read the key from OM again and match the length.The length will still
+ // be the equal to the original data size.
+ OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+ List<OmKeyLocationInfo> keyLocationInfos =
+ keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
+ // Though we have written only block initially, the close will hit
+ // closeContainerException and remaining data in the chunkOutputStream
+ // buffer will be copied into a different allocated block and will be
+ // committed.
+ Assert.assertEquals(4, keyLocationInfos.size());
+ dataLength = 3 * blockSize + (long) (0.5 * chunkSize);
+ Assert.assertEquals(dataLength, keyInfo.getDataSize());
+ validateData(keyName, dataString.concat(dataString2).getBytes());
+ }
+
+ private void waitForContainerClose(String keyName,
+ OzoneOutputStream outputStream, HddsProtos.ReplicationType type)
+ throws Exception {
+
+ ChunkGroupOutputStream groupOutputStream =
+ (ChunkGroupOutputStream) outputStream.getOutputStream();
+ int clientId = groupOutputStream.getOpenID();
+ OMMetadataManager metadataManager =
+ cluster.getOzoneManager().getMetadataManager();
+ String objectKey =
+ metadataManager.getKeyWithDBPrefix(volumeName, bucketName, keyName);
+ byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, clientId);
+ byte[] openKeyData = metadataManager.get(openKey);
+ OmKeyInfo keyInfo = OmKeyInfo.getFromProtobuf(
+ OzoneManagerProtocolProtos.KeyInfo.parseFrom(openKeyData));
+ List<OmKeyLocationInfo> locationInfoList =
+ keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
+ List<Long> containerIdList = new ArrayList<>();
+ List<Pipeline> pipelineList = new ArrayList<>();
+ for (OmKeyLocationInfo info : locationInfoList) {
+ containerIdList.add(info.getContainerID());
+ }
+ Assert.assertTrue(!containerIdList.isEmpty());
+ for (long containerID : containerIdList) {
+ Pipeline pipeline =
+ cluster.getStorageContainerManager().getScmContainerManager()
+ .getContainerWithPipeline(containerID).getPipeline();
+ pipelineList.add(pipeline);
+ List<DatanodeDetails> datanodes = pipeline.getMachines();
+ for (DatanodeDetails details : datanodes) {
+ Assert.assertFalse(ContainerTestHelper
+ .isContainerClosed(cluster, containerID, details));
+ // send the order to close the container
+ cluster.getStorageContainerManager().getScmNodeManager()
+ .addDatanodeCommand(details.getUuid(),
+ new CloseContainerCommand(containerID, type, pipeline.getId()));
+ }
+ }
+
+ int index = 0;
+ for (long containerID : containerIdList) {
+ Pipeline pipeline = pipelineList.get(index);
+ List<DatanodeDetails> datanodes = pipeline.getMachines();
+ for (DatanodeDetails datanodeDetails : datanodes) {
+ GenericTestUtils.waitFor(() -> ContainerTestHelper
+ .isContainerClosed(cluster, containerID, datanodeDetails), 500,
+ 15 * 1000);
+ //double check if it's really closed (waitFor also throws an exception)
+ Assert.assertTrue(ContainerTestHelper
+ .isContainerClosed(cluster, containerID, datanodeDetails));
+ }
+ index++;
+ }
+
+ }
+
+ private OzoneOutputStream createKey(String keyName, ReplicationType type,
+ long size) throws Exception {
+ ReplicationFactor factor =
+ type == ReplicationType.STAND_ALONE ? ReplicationFactor.ONE :
+ ReplicationFactor.THREE;
+ return objectStore.getVolume(volumeName).getBucket(bucketName)
+ .createKey(keyName, size, type, factor);
+ }
+
+ private void validateData(String keyName, byte[] data) throws Exception {
+ byte[] readData = new byte[data.length];
+ OzoneInputStream is =
+ objectStore.getVolume(volumeName).getBucket(bucketName)
+ .readKey(keyName);
+ is.read(readData);
+ MessageDigest sha1 = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+ sha1.update(data);
+ MessageDigest sha2 = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+ sha2.update(readData);
+ Assert.assertTrue(Arrays.equals(sha1.digest(), sha2.digest()));
+ is.close();
+ }
+
+
+ @Test
+ public void testBlockWriteViaRatis() throws Exception {
+ String keyName = "ratis";
+ OzoneOutputStream key = createKey(keyName, ReplicationType.RATIS, 0);
+ byte[] data =
+ fixedLengthString(keyString, chunkSize + chunkSize / 2).getBytes();
+ key.write(data);
+
+ //get the name of a valid container
+ OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName).
+ setBucketName(bucketName).setType(HddsProtos.ReplicationType.RATIS)
+ .setFactor(HddsProtos.ReplicationFactor.THREE)
+ .setKeyName(keyName).build();
+
+ Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream);
+ waitForContainerClose(keyName, key, HddsProtos.ReplicationType.RATIS);
+ // Again Write the Data. This will throw an exception which will be handled
+ // and new blocks will be allocated
+ key.write(data);
+ key.flush();
+ // The write will fail but exception will be handled and length will be
+ // updated correctly in OzoneManager once the steam is closed
+ key.close();
+ // read the key from OM again and match the length.The length will still
+ // be the equal to the original data size.
+ OmKeyInfo keyInfo = cluster.getOzoneManager().lookupKey(keyArgs);
+ List<OmKeyLocationInfo> keyLocationInfos =
+ keyInfo.getKeyLocationVersions().get(0).getBlocksLatestVersionOnly();
+ //we have written two blocks
+ Assert.assertEquals(2, keyLocationInfos.size());
+ OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(0);
+ Assert.assertEquals(data.length - (data.length % chunkSize),
+ omKeyLocationInfo.getLength());
+ Assert.assertEquals(data.length + (data.length % chunkSize),
+ keyLocationInfos.get(1).getLength());
+ Assert.assertEquals(2 * data.length, keyInfo.getDataSize());
+ String dataString = new String(data);
+ dataString.concat(dataString);
+ validateData(keyName, dataString.getBytes());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3974427f/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index ca92110..dc166b5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -21,6 +21,10 @@ package org.apache.hadoop.ozone.container;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.hdds.client.BlockID;
@@ -604,4 +608,21 @@ public final class ContainerTestHelper {
public static long getTestContainerID() {
return Time.getUtcTime();
}
+
+ public static boolean isContainerClosed(MiniOzoneCluster cluster,
+ long containerID, DatanodeDetails datanode) {
+ ContainerData containerData;
+ for (HddsDatanodeService datanodeService : cluster.getHddsDatanodes()) {
+ if (datanode.equals(datanodeService.getDatanodeDetails())) {
+ Container container =
+ datanodeService.getDatanodeStateMachine().getContainer()
+ .getContainerSet().getContainer(containerID);
+ if (container != null) {
+ containerData = container.getContainerData();
+ return containerData.isClosed();
+ }
+ }
+ }
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3974427f/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
index f5dddee..0eb1677 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
@@ -39,12 +39,12 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.Assert;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -124,8 +124,8 @@ public class TestOmBlockVersioning {
// 1st update, version 0
OpenKeySession openKey = ozoneManager.openKey(keyArgs);
// explicitly set the keyLocation list before committing the key.
- keyArgs.setLocationInfoList(
- openKey.getKeyInfo().getLatestVersionLocations().getLocationList());
+ keyArgs.setLocationInfoList(openKey.getKeyInfo().getLatestVersionLocations()
+ .getBlocksLatestVersionOnly());
ozoneManager.commitKey(keyArgs, openKey.getId());
OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs);
@@ -139,8 +139,8 @@ public class TestOmBlockVersioning {
//OmKeyLocationInfo locationInfo =
// ozoneManager.allocateBlock(keyArgs, openKey.getId());
// explicitly set the keyLocation list before committing the key.
- keyArgs.setLocationInfoList(
- openKey.getKeyInfo().getLatestVersionLocations().getLocationList());
+ keyArgs.setLocationInfoList(openKey.getKeyInfo().getLatestVersionLocations()
+ .getBlocksLatestVersionOnly());
ozoneManager.commitKey(keyArgs, openKey.getId());
keyInfo = ozoneManager.lookupKey(keyArgs);
@@ -150,10 +150,14 @@ public class TestOmBlockVersioning {
// 3rd update, version 2
openKey = ozoneManager.openKey(keyArgs);
+
// this block will be appended to the latest version of version 2.
OmKeyLocationInfo locationInfo =
ozoneManager.allocateBlock(keyArgs, openKey.getId());
- List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
+ List<OmKeyLocationInfo> locationInfoList =
+ openKey.getKeyInfo().getLatestVersionLocations()
+ .getBlocksLatestVersionOnly();
+ Assert.assertTrue(locationInfoList.size() == 1);
locationInfoList.add(locationInfo);
keyArgs.setLocationInfoList(locationInfoList);
ozoneManager.commitKey(keyArgs, openKey.getId());
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org