You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bh...@apache.org on 2018/07/09 20:33:24 UTC
[15/37] hadoop git commit: HDDS-160:Refactor KeyManager,
ChunkManager. Contributed by Bharat Viswanadham
HDDS-160:Refactor KeyManager, ChunkManager. Contributed by Bharat Viswanadham
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ca192cb7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ca192cb7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ca192cb7
Branch: refs/heads/trunk
Commit: ca192cb7c9d76163391e88461716938fdb41c4d3
Parents: 998e285
Author: Bharat Viswanadham <bh...@apache.org>
Authored: Fri Jun 15 14:35:33 2018 -0700
Committer: Bharat Viswanadham <bh...@apache.org>
Committed: Fri Jun 15 14:35:33 2018 -0700
----------------------------------------------------------------------
.../common/interfaces/ChunkManager.java | 2 +-
.../container/keyvalue/ChunkManagerImpl.java | 240 +++++++++++++++
.../container/keyvalue/KeyManagerImpl.java | 188 ++++++++++++
.../container/keyvalue/helpers/ChunkUtils.java | 295 +++++++++++++++++++
.../container/keyvalue/helpers/KeyUtils.java | 37 ++-
.../keyvalue/interfaces/ChunkManager.java | 80 +++++
.../keyvalue/interfaces/KeyManager.java | 76 +++++
.../keyvalue/TestChunkManagerImpl.java | 237 +++++++++++++++
.../container/keyvalue/TestKeyManagerImpl.java | 179 +++++++++++
9 files changed, 1331 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca192cb7/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java
index c58fb9d..9de84da 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java
@@ -70,4 +70,4 @@ public interface ChunkManager {
*/
void shutdown();
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca192cb7/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/ChunkManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/ChunkManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/ChunkManagerImpl.java
new file mode 100644
index 0000000..6ee0fd3
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/ChunkManagerImpl.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
+import org.apache.hadoop.ozone.container.common.impl.ChunkLayOutVersion;
+import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .Result.CONTAINER_INTERNAL_ERROR;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .Result.NO_SUCH_ALGORITHM;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
+
+/**
+ * This class is for performing chunk related operations.
+ */
+public class ChunkManagerImpl implements ChunkManager {
+ static final Logger LOG = LoggerFactory.getLogger(ChunkManagerImpl.class);
+
+ /**
+ * writes a given chunk.
+ *
+ * @param container - Container for the chunk
+ * @param blockID - ID of the block
+ * @param info - ChunkInfo
+ * @param data - data of the chunk
+ * @param stage - Stage of the Chunk operation
+ * @throws StorageContainerException
+ */
+ public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
+ byte[] data, ContainerProtos.Stage stage)
+ throws StorageContainerException {
+
+ try {
+
+ KeyValueContainerData containerData = (KeyValueContainerData) container
+ .getContainerData();
+
+ File chunkFile = ChunkUtils.validateChunk(containerData, info);
+ File tmpChunkFile = getTmpChunkFile(chunkFile, info);
+
+ LOG.debug("writing chunk:{} chunk stage:{} chunk file:{} tmp chunk file",
+ info.getChunkName(), stage, chunkFile, tmpChunkFile);
+
+ switch (stage) {
+ case WRITE_DATA:
+ // Initially writes to temporary chunk file.
+ ChunkUtils.writeData(tmpChunkFile, info, data);
+ break;
+ case COMMIT_DATA:
+ // commit the data, means move chunk data from temporary chunk file
+ // to actual chunk file.
+ long sizeDiff = tmpChunkFile.length() - chunkFile.length();
+ commitChunk(tmpChunkFile, chunkFile);
+ containerData.incrBytesUsed(sizeDiff);
+ containerData.incrWriteCount();
+ containerData.incrWriteBytes(sizeDiff);
+ break;
+ case COMBINED:
+ // directly write to the chunk file
+ ChunkUtils.writeData(chunkFile, info, data);
+ containerData.incrBytesUsed(info.getLen());
+ containerData.incrWriteCount();
+ containerData.incrWriteBytes(info.getLen());
+ break;
+ default:
+ throw new IOException("Can not identify write operation.");
+ }
+ } catch (StorageContainerException ex) {
+ throw ex;
+ } catch (NoSuchAlgorithmException ex) {
+ LOG.error("write data failed. error: {}", ex);
+ throw new StorageContainerException("Internal error: ", ex,
+ NO_SUCH_ALGORITHM);
+ } catch (ExecutionException | IOException ex) {
+ LOG.error("write data failed. error: {}", ex);
+ throw new StorageContainerException("Internal error: ", ex,
+ CONTAINER_INTERNAL_ERROR);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.error("write data failed. error: {}", e);
+ throw new StorageContainerException("Internal error: ", e,
+ CONTAINER_INTERNAL_ERROR);
+ }
+ }
+
+ /**
+ * reads the data defined by a chunk.
+ *
+ * @param container - Container for the chunk
+ * @param blockID - ID of the block.
+ * @param info - ChunkInfo.
+ * @return byte array
+ * @throws StorageContainerException
+ * TODO: Right now we do not support partial reads and writes of chunks.
+ * TODO: Explore if we need to do that for ozone.
+ */
+ public byte[] readChunk(Container container, BlockID blockID, ChunkInfo info)
+ throws StorageContainerException {
+ try {
+ KeyValueContainerData containerData = (KeyValueContainerData) container
+ .getContainerData();
+ ByteBuffer data;
+
+ // Checking here, which layout version the container is, and reading
+ // the chunk file in that format.
+ // In version1, we verify checksum if it is available and return data
+ // of the chunk file.
+ if (containerData.getLayOutVersion() == ChunkLayOutVersion
+ .getLatestVersion().getVersion()) {
+ File chunkFile = ChunkUtils.getChunkFile(containerData, info);
+ data = ChunkUtils.readData(chunkFile, info);
+ containerData.incrReadCount();
+ containerData.incrReadBytes(chunkFile.length());
+ return data.array();
+ }
+ } catch(NoSuchAlgorithmException ex) {
+ LOG.error("read data failed. error: {}", ex);
+ throw new StorageContainerException("Internal error: ",
+ ex, NO_SUCH_ALGORITHM);
+ } catch (ExecutionException ex) {
+ LOG.error("read data failed. error: {}", ex);
+ throw new StorageContainerException("Internal error: ",
+ ex, CONTAINER_INTERNAL_ERROR);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.error("read data failed. error: {}", e);
+ throw new StorageContainerException("Internal error: ",
+ e, CONTAINER_INTERNAL_ERROR);
+ }
+ return null;
+ }
+
+ /**
+ * Deletes a given chunk.
+ *
+ * @param container - Container for the chunk
+ * @param blockID - ID of the block
+ * @param info - Chunk Info
+ * @throws StorageContainerException
+ */
+ public void deleteChunk(Container container, BlockID blockID, ChunkInfo info)
+ throws StorageContainerException {
+ Preconditions.checkNotNull(blockID, "Block ID cannot be null.");
+ KeyValueContainerData containerData = (KeyValueContainerData) container
+ .getContainerData();
+ // Checking here, which layout version the container is, and performing
+ // deleting chunk operation.
+ // In version1, we have only chunk file.
+ if (containerData.getLayOutVersion() == ChunkLayOutVersion
+ .getLatestVersion().getVersion()) {
+ File chunkFile = ChunkUtils.getChunkFile(containerData, info);
+ if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) {
+ FileUtil.fullyDelete(chunkFile);
+ containerData.decrBytesUsed(chunkFile.length());
+ } else {
+ LOG.error("Not Supported Operation. Trying to delete a " +
+ "chunk that is in shared file. chunk info : " + info.toString());
+ throw new StorageContainerException("Not Supported Operation. " +
+ "Trying to delete a chunk that is in shared file. chunk info : "
+ + info.toString(), UNSUPPORTED_REQUEST);
+ }
+ }
+ }
+
+ /**
+ * Shutdown the chunkManager.
+ *
+ * In the chunkManager we haven't acquired any resources, so nothing to do
+ * here.
+ */
+
+ public void shutdown() {
+ //TODO: need to revisit this during integration of container IO.
+ }
+
+ /**
+ * Returns the temporary chunkFile path.
+ * @param chunkFile
+ * @param info
+ * @return temporary chunkFile path
+ * @throws StorageContainerException
+ */
+ private File getTmpChunkFile(File chunkFile, ChunkInfo info)
+ throws StorageContainerException {
+ return new File(chunkFile.getParent(),
+ chunkFile.getName() +
+ OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER +
+ OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX);
+ }
+
+ /**
+ * Commit the chunk by renaming the temporary chunk file to chunk file.
+ * @param tmpChunkFile
+ * @param chunkFile
+ * @throws IOException
+ */
+ private void commitChunk(File tmpChunkFile, File chunkFile) throws
+ IOException {
+ Files.move(tmpChunkFile.toPath(), chunkFile.toPath(),
+ StandardCopyOption.REPLACE_EXISTING);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca192cb7/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyManagerImpl.java
new file mode 100644
index 0000000..87565ce
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyManagerImpl.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.KeyManager;
+import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
+import org.apache.hadoop.utils.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_KEY;
+
+/**
+ * This class is for performing key related operations on the KeyValue
+ * Container.
+ */
+public class KeyManagerImpl implements KeyManager {
+
+ static final Logger LOG = LoggerFactory.getLogger(KeyManagerImpl.class);
+
+ private Configuration config;
+
+ /**
+ * Constructs a key Manager.
+ *
+ * @param conf - Ozone configuration
+ */
+ public KeyManagerImpl(Configuration conf) {
+ Preconditions.checkNotNull(conf, "Config cannot be null");
+ this.config = conf;
+ }
+
+ /**
+ * Puts or overwrites a key.
+ *
+ * @param container - Container for which key need to be added.
+ * @param data - Key Data.
+ * @throws IOException
+ */
+ public void putKey(Container container, KeyData data) throws IOException {
+ Preconditions.checkNotNull(data, "KeyData cannot be null for put " +
+ "operation.");
+ Preconditions.checkState(data.getContainerID() >= 0, "Container Id " +
+ "cannot be negative");
+ // We are not locking the key manager since LevelDb serializes all actions
+ // against a single DB. We rely on DB level locking to avoid conflicts.
+ MetadataStore db = KeyUtils.getDB((KeyValueContainerData) container
+ .getContainerData(), config);
+
+ // This is a post condition that acts as a hint to the user.
+ // Should never fail.
+ Preconditions.checkNotNull(db, "DB cannot be null here");
+ db.put(Longs.toByteArray(data.getLocalID()), data.getProtoBufMessage()
+ .toByteArray());
+ }
+
+ /**
+ * Gets an existing key.
+ *
+ * @param container - Container from which key need to be get.
+ * @param data - Key Data.
+ * @return Key Data.
+ * @throws IOException
+ */
+ public KeyData getKey(Container container, KeyData data) throws IOException {
+ Preconditions.checkNotNull(data, "Key data cannot be null");
+ Preconditions.checkNotNull(data.getContainerID(), "Container name cannot" +
+ " be null");
+ KeyValueContainerData containerData = (KeyValueContainerData) container
+ .getContainerData();
+ MetadataStore db = KeyUtils.getDB(containerData, config);
+ // This is a post condition that acts as a hint to the user.
+ // Should never fail.
+ Preconditions.checkNotNull(db, "DB cannot be null here");
+ byte[] kData = db.get(Longs.toByteArray(data.getLocalID()));
+ if (kData == null) {
+ throw new StorageContainerException("Unable to find the key.",
+ NO_SUCH_KEY);
+ }
+ ContainerProtos.KeyData keyData = ContainerProtos.KeyData.parseFrom(kData);
+ return KeyData.getFromProtoBuf(keyData);
+ }
+
+ /**
+ * Deletes an existing Key.
+ *
+ * @param container - Container from which key need to be deleted.
+ * @param blockID - ID of the block.
+ * @throws StorageContainerException
+ */
+ public void deleteKey(Container container, BlockID blockID) throws
+ IOException {
+ Preconditions.checkNotNull(blockID, "block ID cannot be null.");
+ Preconditions.checkState(blockID.getContainerID() >= 0,
+ "Container ID cannot be negative.");
+ Preconditions.checkState(blockID.getLocalID() >= 0,
+ "Local ID cannot be negative.");
+
+ KeyValueContainerData cData = (KeyValueContainerData) container
+ .getContainerData();
+ MetadataStore db = KeyUtils.getDB(cData, config);
+ // This is a post condition that acts as a hint to the user.
+ // Should never fail.
+ Preconditions.checkNotNull(db, "DB cannot be null here");
+ // Note : There is a race condition here, since get and delete
+ // are not atomic. Leaving it here since the impact is refusing
+ // to delete a key which might have just gotten inserted after
+ // the get check.
+ byte[] kKey = Longs.toByteArray(blockID.getLocalID());
+ byte[] kData = db.get(kKey);
+ if (kData == null) {
+ throw new StorageContainerException("Unable to find the key.",
+ NO_SUCH_KEY);
+ }
+ db.delete(kKey);
+ }
+
+ /**
+ * List keys in a container.
+ *
+ * @param container - Container from which keys need to be listed.
+ * @param startLocalID - Key to start from, 0 to begin.
+ * @param count - Number of keys to return.
+ * @return List of Keys that match the criteria.
+ */
+ public List<KeyData> listKey(Container container, long startLocalID, int
+ count) throws IOException {
+ Preconditions.checkNotNull(container, "container cannot be null");
+ Preconditions.checkState(startLocalID >= 0, "startLocal ID cannot be " +
+ "negative");
+ Preconditions.checkArgument(count > 0,
+ "Count must be a positive number.");
+ container.readLock();
+ List<KeyData> result = null;
+ KeyValueContainerData cData = (KeyValueContainerData) container
+ .getContainerData();
+ MetadataStore db = KeyUtils.getDB(cData, config);
+ result = new ArrayList<>();
+ byte[] startKeyInBytes = Longs.toByteArray(startLocalID);
+ List<Map.Entry<byte[], byte[]>> range = db.getSequentialRangeKVs(
+ startKeyInBytes, count, null);
+ for (Map.Entry<byte[], byte[]> entry : range) {
+ KeyData value = KeyUtils.getKeyData(entry.getValue());
+ KeyData data = new KeyData(value.getBlockID());
+ result.add(data);
+ }
+ return result;
+ }
+
+ /**
+ * Shutdown KeyValueContainerManager.
+ */
+ public void shutdown() {
+ KeyUtils.shutdownCache(ContainerCache.getInstance(config));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca192cb7/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
new file mode 100644
index 0000000..c837ccc
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue.helpers;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
+import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.AsynchronousFileChannel;
+import java.nio.channels.FileLock;
+import java.nio.file.StandardOpenOption;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*;
+
+/**
+ * Utility methods for chunk operations for KeyValue container.
+ */
+public final class ChunkUtils {
+
+ /** Never constructed. **/
+ private ChunkUtils() {
+
+ }
+
+ /**
+ * Writes the data in chunk Info to the specified location in the chunkfile.
+ *
+ * @param chunkFile - File to write data to.
+ * @param chunkInfo - Data stream to write.
+ * @param data - The data buffer.
+ * @throws StorageContainerException
+ */
+ public static void writeData(File chunkFile, ChunkInfo chunkInfo,
+ byte[] data) throws
+ StorageContainerException, ExecutionException, InterruptedException,
+ NoSuchAlgorithmException {
+
+ Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
+ if (data.length != chunkInfo.getLen()) {
+ String err = String.format("data array does not match the length " +
+ "specified. DataLen: %d Byte Array: %d",
+ chunkInfo.getLen(), data.length);
+ log.error(err);
+ throw new StorageContainerException(err, INVALID_WRITE_SIZE);
+ }
+
+ AsynchronousFileChannel file = null;
+ FileLock lock = null;
+
+ try {
+ file =
+ AsynchronousFileChannel.open(chunkFile.toPath(),
+ StandardOpenOption.CREATE,
+ StandardOpenOption.WRITE,
+ StandardOpenOption.SPARSE,
+ StandardOpenOption.SYNC);
+ lock = file.lock().get();
+ if (chunkInfo.getChecksum() != null &&
+ !chunkInfo.getChecksum().isEmpty()) {
+ verifyChecksum(chunkInfo, data, log);
+ }
+ int size = file.write(ByteBuffer.wrap(data), chunkInfo.getOffset()).get();
+ if (size != data.length) {
+ log.error("Invalid write size found. Size:{} Expected: {} ", size,
+ data.length);
+ throw new StorageContainerException("Invalid write size found. " +
+ "Size: " + size + " Expected: " + data.length, INVALID_WRITE_SIZE);
+ }
+ } catch (StorageContainerException ex) {
+ throw ex;
+ } catch(IOException e) {
+ throw new StorageContainerException(e, IO_EXCEPTION);
+
+ } finally {
+ if (lock != null) {
+ try {
+ lock.release();
+ } catch (IOException e) {
+ log.error("Unable to release lock ??, Fatal Error.");
+ throw new StorageContainerException(e, CONTAINER_INTERNAL_ERROR);
+
+ }
+ }
+ if (file != null) {
+ try {
+ file.close();
+ } catch (IOException e) {
+ throw new StorageContainerException("Error closing chunk file",
+ e, CONTAINER_INTERNAL_ERROR);
+ }
+ }
+ }
+ }
+
+ /**
+ * Reads data from an existing chunk file.
+ *
+ * @param chunkFile - file where data lives.
+ * @param data - chunk definition.
+ * @return ByteBuffer
+ * @throws StorageContainerException
+ * @throws ExecutionException
+ * @throws InterruptedException
+ */
+ public static ByteBuffer readData(File chunkFile, ChunkInfo data) throws
+ StorageContainerException, ExecutionException, InterruptedException,
+ NoSuchAlgorithmException {
+ Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
+
+ if (!chunkFile.exists()) {
+ log.error("Unable to find the chunk file. chunk info : {}",
+ data.toString());
+ throw new StorageContainerException("Unable to find the chunk file. " +
+ "chunk info " +
+ data.toString(), UNABLE_TO_FIND_CHUNK);
+ }
+
+ AsynchronousFileChannel file = null;
+ FileLock lock = null;
+ try {
+ file =
+ AsynchronousFileChannel.open(chunkFile.toPath(),
+ StandardOpenOption.READ);
+ lock = file.lock(data.getOffset(), data.getLen(), true).get();
+
+ ByteBuffer buf = ByteBuffer.allocate((int) data.getLen());
+ file.read(buf, data.getOffset()).get();
+
+ if (data.getChecksum() != null && !data.getChecksum().isEmpty()) {
+ verifyChecksum(data, buf.array(), log);
+ }
+
+ return buf;
+ } catch (IOException e) {
+ throw new StorageContainerException(e, IO_EXCEPTION);
+ } finally {
+ if (lock != null) {
+ try {
+ lock.release();
+ } catch (IOException e) {
+ log.error("I/O error is lock release.");
+ }
+ }
+ if (file != null) {
+ IOUtils.closeStream(file);
+ }
+ }
+ }
+
+ /**
+ * Verifies the checksum of a chunk against the data buffer.
+ *
+ * @param chunkInfo - Chunk Info.
+ * @param data - data buffer
+ * @param log - log
+ * @throws NoSuchAlgorithmException
+ * @throws StorageContainerException
+ */
+ private static void verifyChecksum(ChunkInfo chunkInfo, byte[] data, Logger
+ log) throws NoSuchAlgorithmException, StorageContainerException {
+ MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
+ sha.update(data);
+ if (!Hex.encodeHexString(sha.digest()).equals(
+ chunkInfo.getChecksum())) {
+ log.error("Checksum mismatch. Provided: {} , computed: {}",
+ chunkInfo.getChecksum(), DigestUtils.sha256Hex(sha.digest()));
+ throw new StorageContainerException("Checksum mismatch. Provided: " +
+ chunkInfo.getChecksum() + " , computed: " +
+ DigestUtils.sha256Hex(sha.digest()), CHECKSUM_MISMATCH);
+ }
+ }
+
+ /**
+ * Validates chunk data and returns a file object to Chunk File that we are
+ * expected to write data to.
+ *
+ * @param data - container data.
+ * @param info - chunk info.
+ * @return File
+ * @throws StorageContainerException
+ */
+ public static File validateChunk(KeyValueContainerData data, ChunkInfo info)
+ throws StorageContainerException {
+
+ Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
+
+ File chunkFile = getChunkFile(data, info);
+ if (isOverWriteRequested(chunkFile, info)) {
+ if (!isOverWritePermitted(info)) {
+ log.error("Rejecting write chunk request. Chunk overwrite " +
+ "without explicit request. {}", info.toString());
+ throw new StorageContainerException("Rejecting write chunk request. " +
+ "OverWrite flag required." + info.toString(),
+ OVERWRITE_FLAG_REQUIRED);
+ }
+ }
+ return chunkFile;
+ }
+
+ /**
+ * Validates that Path to chunk file exists.
+ *
+ * @param containerData - Container Data
+ * @param info - Chunk info
+ * @return - File.
+ * @throws StorageContainerException
+ */
+ public static File getChunkFile(KeyValueContainerData containerData,
+ ChunkInfo info) throws
+ StorageContainerException {
+
+ Preconditions.checkNotNull(containerData, "Container data can't be null");
+ Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
+
+ String chunksPath = containerData.getChunksPath();
+ if (chunksPath == null) {
+ log.error("Chunks path is null in the container data");
+ throw new StorageContainerException("Unable to get Chunks directory.",
+ UNABLE_TO_FIND_DATA_DIR);
+ }
+ File chunksLoc = new File(chunksPath);
+ if (!chunksLoc.exists()) {
+ log.error("Chunks path does not exist");
+ throw new StorageContainerException("Unable to get Chunks directory.",
+ UNABLE_TO_FIND_DATA_DIR);
+ }
+
+ return chunksLoc.toPath().resolve(info.getChunkName()).toFile();
+ }
+
+ /**
+ * Checks if we are getting a request to overwrite an existing range of
+ * chunk.
+ *
+ * @param chunkFile - File
+ * @param chunkInfo - Buffer to write
+ * @return bool
+ */
+ public static boolean isOverWriteRequested(File chunkFile, ChunkInfo
+ chunkInfo) {
+
+ if (!chunkFile.exists()) {
+ return false;
+ }
+
+ long offset = chunkInfo.getOffset();
+ return offset < chunkFile.length();
+ }
+
+ /**
+ * Overwrite is permitted if an only if the user explicitly asks for it. We
+ * permit this iff the key/value pair contains a flag called
+ * [OverWriteRequested, true].
+ *
+ * @param chunkInfo - Chunk info
+ * @return true if the user asks for it.
+ */
+ public static boolean isOverWritePermitted(ChunkInfo chunkInfo) {
+ String overWrite = chunkInfo.getMetadata().get(OzoneConsts.CHUNK_OVERWRITE);
+ return (overWrite != null) &&
+ (!overWrite.isEmpty()) &&
+ (Boolean.valueOf(overWrite));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca192cb7/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
index 7d9f0e6..d45f598 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
@@ -20,14 +20,19 @@ package org.apache.hadoop.ozone.container.keyvalue.helpers;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
import org.apache.hadoop.utils.MetadataStore;
import java.io.IOException;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNABLE_TO_READ_METADATA_DB;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .Result.NO_SUCH_KEY;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .Result.UNABLE_TO_READ_METADATA_DB;
/**
* Utils functions to help key functions.
@@ -79,4 +84,32 @@ public final class KeyUtils {
cache.removeDB(container.getContainerId());
}
-}
+ /**
+ * Shutdown all DB Handles.
+ *
+ * @param cache - Cache for DB Handles.
+ */
+ @SuppressWarnings("unchecked")
+ public static void shutdownCache(ContainerCache cache) {
+ cache.shutdownCache();
+ }
+
+ /**
+ * Parses the {@link KeyData} from a bytes array.
+ *
+ * @param bytes key data in bytes.
+ * @return key data.
+ * @throws IOException if the bytes array is malformed or invalid.
+ */
+ public static KeyData getKeyData(byte[] bytes) throws IOException {
+ try {
+ ContainerProtos.KeyData keyData = ContainerProtos.KeyData.parseFrom(
+ bytes);
+ KeyData data = KeyData.getFromProtoBuf(keyData);
+ return data;
+ } catch (IOException e) {
+ throw new StorageContainerException("Failed to parse key data from the" +
+ " bytes array.", NO_SUCH_KEY);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca192cb7/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
new file mode 100644
index 0000000..7134be1
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
@@ -0,0 +1,80 @@
+package org.apache.hadoop.ozone.container.keyvalue.interfaces;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+
+/**
+ * Chunk Manager allows read, write, delete and listing of chunks in
+ * a container.
+ */
+
+public interface ChunkManager {
+
+ /**
+ * writes a given chunk.
+ *
+ * @param container - Container for the chunk
+ * @param blockID - ID of the block.
+ * @param info - ChunkInfo.
+ * @param stage - Chunk Stage write.
+ * @throws StorageContainerException
+ */
+ void writeChunk(Container container, BlockID blockID, ChunkInfo info,
+ byte[] data, ContainerProtos.Stage stage)
+ throws StorageContainerException;
+
+ /**
+ * reads the data defined by a chunk.
+ *
+ * @param container - Container for the chunk
+ * @param blockID - ID of the block.
+ * @param info - ChunkInfo.
+ * @return byte array
+ * @throws StorageContainerException
+ *
+ * TODO: Right now we do not support partial reads and writes of chunks.
+ * TODO: Explore if we need to do that for ozone.
+ */
+ byte[] readChunk(Container container, BlockID blockID, ChunkInfo info) throws
+ StorageContainerException;
+
+ /**
+ * Deletes a given chunk.
+ *
+ * @param container - Container for the chunk
+ * @param blockID - ID of the block.
+ * @param info - Chunk Info
+ * @throws StorageContainerException
+ */
+ void deleteChunk(Container container, BlockID blockID, ChunkInfo info) throws
+ StorageContainerException;
+
+ // TODO : Support list operations.
+
+ /**
+ * Shutdown the chunkManager.
+ */
+ void shutdown();
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca192cb7/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java
new file mode 100644
index 0000000..ebda97e
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/KeyManager.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue.interfaces;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * KeyManager is for performing key related operations on the container.
+ */
+public interface KeyManager {
+
+ /**
+ * Puts or overwrites a key.
+ *
+ * @param container - Container for which key need to be added.
+ * @param data - Key Data.
+ * @throws IOException
+ */
+ void putKey(Container container, KeyData data) throws IOException;
+
+ /**
+ * Gets an existing key.
+ *
+ * @param container - Container from which key need to be get.
+ * @param data - Key Data.
+ * @return Key Data.
+ * @throws IOException
+ */
+ KeyData getKey(Container container, KeyData data) throws IOException;
+
+ /**
+ * Deletes an existing Key.
+ *
+ * @param container - Container from which key need to be deleted.
+ * @param blockID - ID of the block.
+ * @throws StorageContainerException
+ */
+ void deleteKey(Container container, BlockID blockID) throws IOException;
+
+ /**
+ * List keys in a container.
+ *
+ * @param container - Container from which keys need to be listed.
+ * @param startLocalID - Key to start from, 0 to begin.
+ * @param count - Number of keys to return.
+ * @return List of Keys that match the criteria.
+ */
+ List<KeyData> listKey(Container container, long startLocalID, int count) throws
+ IOException;
+
+ /**
+ * Shutdown ContainerManager.
+ */
+ void shutdown();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca192cb7/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
new file mode 100644
index 0000000..ca936c7
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+
+/**
+ * This class is used to test ChunkManager operations.
+ */
+public class TestChunkManagerImpl {
+
+ private OzoneConfiguration config;
+ private String scmId = UUID.randomUUID().toString();
+ private VolumeSet volumeSet;
+ private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy;
+ private KeyValueContainerData keyValueContainerData;
+ private KeyValueContainer keyValueContainer;
+ private KeyData keyData;
+ private BlockID blockID;
+ private ChunkManagerImpl chunkManager;
+ private ChunkInfo chunkInfo;
+ private byte[] data;
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @Before
+ public void setUp() throws Exception {
+ config = new OzoneConfiguration();
+
+ HddsVolume hddsVolume = new HddsVolume.Builder(folder.getRoot()
+ .getAbsolutePath()).conf(config).datanodeUuid(UUID.randomUUID()
+ .toString()).build();
+
+ volumeSet = mock(VolumeSet.class);
+
+ volumeChoosingPolicy = mock(RoundRobinVolumeChoosingPolicy.class);
+ Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
+ .thenReturn(hddsVolume);
+
+ keyValueContainerData = new KeyValueContainerData(
+ ContainerProtos.ContainerType.KeyValueContainer, 1L);
+
+ keyValueContainer = new KeyValueContainer(
+ keyValueContainerData, config);
+
+ keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
+
+ data = "testing write chunks".getBytes();
+ // Creating KeyData
+ blockID = new BlockID(1L, 1L);
+ keyData = new KeyData(blockID);
+ keyData.addMetadata("VOLUME", "ozone");
+ keyData.addMetadata("OWNER", "hdfs");
+ List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
+ chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
+ .getLocalID(), 0), 0, data.length);
+ chunkList.add(chunkInfo.getProtoBufMessage());
+ keyData.setChunks(chunkList);
+
+ // Create a ChunkManager object.
+ chunkManager = new ChunkManagerImpl();
+
+ }
+
+ @Test
+ public void testWriteChunkStageWriteAndCommit() throws Exception {
+ //As in Setup, we try to create container, these paths should exist.
+ assertTrue(keyValueContainerData.getChunksPath() != null);
+ File chunksPath = new File(keyValueContainerData.getChunksPath());
+ assertTrue(chunksPath.exists());
+ // Initially chunks folder should be empty.
+ assertTrue(chunksPath.listFiles().length == 0);
+ chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
+ ContainerProtos.Stage.WRITE_DATA);
+ // Now a chunk file is being written with Stage WRITE_DATA, so it should
+ // create a temporary chunk file.
+ assertTrue(chunksPath.listFiles().length == 1);
+ chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
+ ContainerProtos.Stage.COMMIT_DATA);
+ // Old temp file should have been renamed to chunk file.
+ assertTrue(chunksPath.listFiles().length == 1);
+
+ }
+
+ @Test
+ public void testWriteChunkIncorrectLength() throws Exception {
+ try {
+ long randomLength = 200L;
+ chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
+ .getLocalID(), 0), 0, randomLength);
+ List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
+ chunkList.add(chunkInfo.getProtoBufMessage());
+ keyData.setChunks(chunkList);
+ chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
+ ContainerProtos.Stage.WRITE_DATA);
+ fail("testWriteChunkIncorrectLength failed");
+ } catch (StorageContainerException ex) {
+ GenericTestUtils.assertExceptionContains("data array does not match " +
+ "the length ", ex);
+ assertEquals(ContainerProtos.Result.INVALID_WRITE_SIZE, ex.getResult());
+ }
+ }
+
+ @Test
+ public void testWriteChunkStageCombinedData() throws Exception {
+ //As in Setup, we try to create container, these paths should exist.
+ assertTrue(keyValueContainerData.getChunksPath() != null);
+ File chunksPath = new File(keyValueContainerData.getChunksPath());
+ assertTrue(chunksPath.exists());
+ // Initially chunks folder should be empty.
+ assertTrue(chunksPath.listFiles().length == 0);
+ chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
+ ContainerProtos.Stage.COMBINED);
+ // Now a chunk file is being written with Stage WRITE_DATA, so it should
+ // create a temporary chunk file.
+ assertTrue(chunksPath.listFiles().length == 1);
+ }
+
+ @Test
+ public void testReadChunk() throws Exception {
+ chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
+ ContainerProtos.Stage.COMBINED);
+ byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID,
+ chunkInfo);
+ assertEquals(expectedData.length, data.length);
+ assertTrue(Arrays.equals(expectedData, data));
+ }
+
+ @Test
+ public void testDeleteChunk() throws Exception {
+ File chunksPath = new File(keyValueContainerData.getChunksPath());
+ chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
+ ContainerProtos.Stage.COMBINED);
+ assertTrue(chunksPath.listFiles().length == 1);
+ chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo);
+ assertTrue(chunksPath.listFiles().length == 0);
+ }
+
+ @Test
+ public void testDeleteChunkUnsupportedRequest() throws Exception {
+ try {
+ chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
+ ContainerProtos.Stage.COMBINED);
+ long randomLength = 200L;
+ chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
+ .getLocalID(), 0), 0, randomLength);
+ List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
+ chunkList.add(chunkInfo.getProtoBufMessage());
+ keyData.setChunks(chunkList);
+ chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo);
+ fail("testDeleteChunkUnsupportedRequest");
+ } catch (StorageContainerException ex) {
+ GenericTestUtils.assertExceptionContains("Not Supported Operation.", ex);
+ assertEquals(ContainerProtos.Result.UNSUPPORTED_REQUEST, ex.getResult());
+ }
+ }
+
+ @Test
+ public void testWriteChunkChecksumMismatch() throws Exception {
+ try {
+ chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID
+ .getLocalID(), 0), 0, data.length);
+ //Setting checksum to some value.
+ chunkInfo.setChecksum("some garbage");
+ List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
+ chunkList.add(chunkInfo.getProtoBufMessage());
+ keyData.setChunks(chunkList);
+ chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, data,
+ ContainerProtos.Stage.COMBINED);
+ fail("testWriteChunkChecksumMismatch failed");
+ } catch (StorageContainerException ex) {
+ GenericTestUtils.assertExceptionContains("Checksum mismatch.", ex);
+ assertEquals(ContainerProtos.Result.CHECKSUM_MISMATCH, ex.getResult());
+ }
+ }
+
+ @Test
+ public void testReadChunkFileNotExists() throws Exception {
+ try {
+ // trying to read a chunk, where chunk file does not exist
+ byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID,
+ chunkInfo);
+ fail("testReadChunkFileNotExists failed");
+ } catch (StorageContainerException ex) {
+ GenericTestUtils.assertExceptionContains("Unable to find the chunk " +
+ "file.", ex);
+ assertEquals(ContainerProtos.Result.UNABLE_TO_FIND_CHUNK, ex.getResult());
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca192cb7/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java
new file mode 100644
index 0000000..a6f50c4
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyManagerImpl.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.volume
+ .RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+
+/**
+ * This class is used to test key related operations on the container.
+ */
+public class TestKeyManagerImpl {
+
+ private OzoneConfiguration config;
+ private String scmId = UUID.randomUUID().toString();
+ private VolumeSet volumeSet;
+ private RoundRobinVolumeChoosingPolicy volumeChoosingPolicy;
+ private KeyValueContainerData keyValueContainerData;
+ private KeyValueContainer keyValueContainer;
+ private KeyData keyData;
+ private KeyManagerImpl keyValueContainerManager;
+ private BlockID blockID;
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+
+ @Before
+ public void setUp() throws Exception {
+ config = new OzoneConfiguration();
+
+ HddsVolume hddsVolume = new HddsVolume.Builder(folder.getRoot()
+ .getAbsolutePath()).conf(config).datanodeUuid(UUID.randomUUID()
+ .toString()).build();
+
+ volumeSet = mock(VolumeSet.class);
+
+ volumeChoosingPolicy = mock(RoundRobinVolumeChoosingPolicy.class);
+ Mockito.when(volumeChoosingPolicy.chooseVolume(anyList(), anyLong()))
+ .thenReturn(hddsVolume);
+
+ keyValueContainerData = new KeyValueContainerData(
+ ContainerProtos.ContainerType.KeyValueContainer, 1L);
+
+ keyValueContainer = new KeyValueContainer(
+ keyValueContainerData, config);
+
+ keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
+
+ // Creating KeyData
+ blockID = new BlockID(1L, 1L);
+ keyData = new KeyData(blockID);
+ keyData.addMetadata("VOLUME", "ozone");
+ keyData.addMetadata("OWNER", "hdfs");
+ List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
+ ChunkInfo info = new ChunkInfo(String.format("%d.data.%d", blockID
+ .getLocalID(), 0), 0, 1024);
+ chunkList.add(info.getProtoBufMessage());
+ keyData.setChunks(chunkList);
+
+ // Create KeyValueContainerManager
+ keyValueContainerManager = new KeyManagerImpl(config);
+
+ }
+
+ @Test
+ public void testPutAndGetKey() throws Exception {
+ //Put Key
+ keyValueContainerManager.putKey(keyValueContainer, keyData);
+
+ //Get Key
+ KeyData fromGetKeyData = keyValueContainerManager.getKey(keyValueContainer,
+ keyData);
+
+ assertEquals(keyData.getContainerID(), fromGetKeyData.getContainerID());
+ assertEquals(keyData.getLocalID(), fromGetKeyData.getLocalID());
+ assertEquals(keyData.getChunks().size(), fromGetKeyData.getChunks().size());
+ assertEquals(keyData.getMetadata().size(), fromGetKeyData.getMetadata()
+ .size());
+
+ }
+
+
+ @Test
+ public void testDeleteKey() throws Exception {
+ try {
+ //Put Key
+ keyValueContainerManager.putKey(keyValueContainer, keyData);
+ //Delete Key
+ keyValueContainerManager.deleteKey(keyValueContainer, blockID);
+ } catch (IOException ex) {
+ fail("testDeleteKey failed");
+ }
+ }
+
+ @Test
+ public void testListKey() throws Exception {
+ try {
+ keyValueContainerManager.putKey(keyValueContainer, keyData);
+ List<KeyData> listKeyData = keyValueContainerManager.listKey(
+ keyValueContainer, 1, 10);
+ assertNotNull(listKeyData);
+ assertTrue(listKeyData.size() == 1);
+
+ for (long i = 2; i <= 10; i++) {
+ blockID = new BlockID(1L, i);
+ keyData = new KeyData(blockID);
+ keyData.addMetadata("VOLUME", "ozone");
+ keyData.addMetadata("OWNER", "hdfs");
+ List<ContainerProtos.ChunkInfo> chunkList = new LinkedList<>();
+ ChunkInfo info = new ChunkInfo(String.format("%d.data.%d", blockID
+ .getLocalID(), 0), 0, 1024);
+ chunkList.add(info.getProtoBufMessage());
+ keyData.setChunks(chunkList);
+ keyValueContainerManager.putKey(keyValueContainer, keyData);
+ }
+
+ listKeyData = keyValueContainerManager.listKey(
+ keyValueContainer, 1, 10);
+ assertNotNull(listKeyData);
+ assertTrue(listKeyData.size() == 10);
+
+ } catch (IOException ex) {
+ fail("testListKey failed");
+ }
+ }
+
+ @Test
+ public void testGetNoSuchKey() throws Exception {
+ try {
+ keyData = new KeyData(new BlockID(1L, 2L));
+ keyValueContainerManager.getKey(keyValueContainer, keyData);
+ fail("testGetNoSuchKey failed");
+ } catch (StorageContainerException ex) {
+ GenericTestUtils.assertExceptionContains("Unable to find the key.", ex);
+ assertEquals(ContainerProtos.Result.NO_SUCH_KEY, ex.getResult());
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org