You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bh...@apache.org on 2018/07/09 20:33:26 UTC
[17/37] hadoop git commit: HDDS-173. Refactor Dispatcher and
implement Handler for new ContainerIO design.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyManagerImpl.java
deleted file mode 100644
index 87565ce..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyManagerImpl.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.keyvalue;
-
-import com.google.common.base.Preconditions;
-import com.google.common.primitives.Longs;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.client.BlockID;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
-import org.apache.hadoop.ozone.container.common.helpers.KeyData;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.common.interfaces.Container;
-import org.apache.hadoop.ozone.container.keyvalue.interfaces.KeyManager;
-import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
-import org.apache.hadoop.utils.MetadataStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_KEY;
-
-/**
- * This class is for performing key related operations on the KeyValue
- * Container.
- */
-public class KeyManagerImpl implements KeyManager {
-
- static final Logger LOG = LoggerFactory.getLogger(KeyManagerImpl.class);
-
- private Configuration config;
-
- /**
- * Constructs a key Manager.
- *
- * @param conf - Ozone configuration
- */
- public KeyManagerImpl(Configuration conf) {
- Preconditions.checkNotNull(conf, "Config cannot be null");
- this.config = conf;
- }
-
- /**
- * Puts or overwrites a key.
- *
- * @param container - Container for which key need to be added.
- * @param data - Key Data.
- * @throws IOException
- */
- public void putKey(Container container, KeyData data) throws IOException {
- Preconditions.checkNotNull(data, "KeyData cannot be null for put " +
- "operation.");
- Preconditions.checkState(data.getContainerID() >= 0, "Container Id " +
- "cannot be negative");
- // We are not locking the key manager since LevelDb serializes all actions
- // against a single DB. We rely on DB level locking to avoid conflicts.
- MetadataStore db = KeyUtils.getDB((KeyValueContainerData) container
- .getContainerData(), config);
-
- // This is a post condition that acts as a hint to the user.
- // Should never fail.
- Preconditions.checkNotNull(db, "DB cannot be null here");
- db.put(Longs.toByteArray(data.getLocalID()), data.getProtoBufMessage()
- .toByteArray());
- }
-
- /**
- * Gets an existing key.
- *
- * @param container - Container from which key need to be get.
- * @param data - Key Data.
- * @return Key Data.
- * @throws IOException
- */
- public KeyData getKey(Container container, KeyData data) throws IOException {
- Preconditions.checkNotNull(data, "Key data cannot be null");
- Preconditions.checkNotNull(data.getContainerID(), "Container name cannot" +
- " be null");
- KeyValueContainerData containerData = (KeyValueContainerData) container
- .getContainerData();
- MetadataStore db = KeyUtils.getDB(containerData, config);
- // This is a post condition that acts as a hint to the user.
- // Should never fail.
- Preconditions.checkNotNull(db, "DB cannot be null here");
- byte[] kData = db.get(Longs.toByteArray(data.getLocalID()));
- if (kData == null) {
- throw new StorageContainerException("Unable to find the key.",
- NO_SUCH_KEY);
- }
- ContainerProtos.KeyData keyData = ContainerProtos.KeyData.parseFrom(kData);
- return KeyData.getFromProtoBuf(keyData);
- }
-
- /**
- * Deletes an existing Key.
- *
- * @param container - Container from which key need to be deleted.
- * @param blockID - ID of the block.
- * @throws StorageContainerException
- */
- public void deleteKey(Container container, BlockID blockID) throws
- IOException {
- Preconditions.checkNotNull(blockID, "block ID cannot be null.");
- Preconditions.checkState(blockID.getContainerID() >= 0,
- "Container ID cannot be negative.");
- Preconditions.checkState(blockID.getLocalID() >= 0,
- "Local ID cannot be negative.");
-
- KeyValueContainerData cData = (KeyValueContainerData) container
- .getContainerData();
- MetadataStore db = KeyUtils.getDB(cData, config);
- // This is a post condition that acts as a hint to the user.
- // Should never fail.
- Preconditions.checkNotNull(db, "DB cannot be null here");
- // Note : There is a race condition here, since get and delete
- // are not atomic. Leaving it here since the impact is refusing
- // to delete a key which might have just gotten inserted after
- // the get check.
- byte[] kKey = Longs.toByteArray(blockID.getLocalID());
- byte[] kData = db.get(kKey);
- if (kData == null) {
- throw new StorageContainerException("Unable to find the key.",
- NO_SUCH_KEY);
- }
- db.delete(kKey);
- }
-
- /**
- * List keys in a container.
- *
- * @param container - Container from which keys need to be listed.
- * @param startLocalID - Key to start from, 0 to begin.
- * @param count - Number of keys to return.
- * @return List of Keys that match the criteria.
- */
- public List<KeyData> listKey(Container container, long startLocalID, int
- count) throws IOException {
- Preconditions.checkNotNull(container, "container cannot be null");
- Preconditions.checkState(startLocalID >= 0, "startLocal ID cannot be " +
- "negative");
- Preconditions.checkArgument(count > 0,
- "Count must be a positive number.");
- container.readLock();
- List<KeyData> result = null;
- KeyValueContainerData cData = (KeyValueContainerData) container
- .getContainerData();
- MetadataStore db = KeyUtils.getDB(cData, config);
- result = new ArrayList<>();
- byte[] startKeyInBytes = Longs.toByteArray(startLocalID);
- List<Map.Entry<byte[], byte[]>> range = db.getSequentialRangeKVs(
- startKeyInBytes, count, null);
- for (Map.Entry<byte[], byte[]> entry : range) {
- KeyData value = KeyUtils.getKeyData(entry.getValue());
- KeyData data = new KeyData(value.getBlockID());
- result.add(data);
- }
- return result;
- }
-
- /**
- * Shutdown KeyValueContainerManager.
- */
- public void shutdown() {
- KeyUtils.shutdownCache(ContainerCache.getInstance(config));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 740967b..a1cbb4e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -18,29 +18,29 @@
package org.apache.hadoop.ozone.container.keyvalue;
-
import com.google.common.base.Preconditions;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerLifeCycleState;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-
-
+import org.apache.hadoop.hdds.scm.container.common.helpers
+ .StorageContainerException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.impl.ContainerData;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueYaml;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.keyvalue.helpers
+ .KeyValueContainerLocationUtil;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.utils.MetadataStore;
import org.slf4j.Logger;
@@ -116,9 +116,9 @@ public class KeyValueContainer implements Container {
Preconditions.checkNotNull(scmId, "scmId cannot be null");
File containerMetaDataPath = null;
+ //acquiring volumeset lock and container lock
+ volumeSet.acquireLock();
try {
- //acquiring volumeset lock and container lock
- volumeSet.acquireLock();
HddsVolume containerVolume = volumeChoosingPolicy.chooseVolume(volumeSet
.getVolumesList(), containerMaxSize);
String containerBasePath = containerVolume.getHddsRootDir().toString();
@@ -404,10 +404,19 @@ public class KeyValueContainer implements Container {
}
@Override
- public ContainerData getContainerData() {
+ public KeyValueContainerData getContainerData() {
return containerData;
}
+ @Override
+ public ContainerLifeCycleState getContainerState() {
+ return containerData.getState();
+ }
+
+ @Override
+ public ContainerProtos.ContainerType getContainerType() {
+ return ContainerProtos.ContainerType.KeyValueContainer;
+ }
@Override
public void update(Map<String, String> metadata, boolean forceUpdate)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
new file mode 100644
index 0000000..8da4084
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * This class represents the KeyValueContainer metadata, which is the
+ * in-memory representation of container metadata and is represented on disk
+ * by the .container file.
+ */
+public class KeyValueContainerData extends ContainerData {
+
+ // Path to Container metadata Level DB/RocksDB Store and .container file.
+ private String metadataPath;
+
+ // Path to Physical file system where chunks are stored.
+ private String chunksPath;
+
+ //Type of DB used to store key to chunks mapping
+ private String containerDBType;
+
+ //Number of pending deletion blocks in container.
+ private int numPendingDeletionBlocks;
+
+ private File dbFile = null;
+
+ /**
+ * Constructs KeyValueContainerData object.
+ * @param type - containerType
+ * @param id - ContainerId
+ */
+ public KeyValueContainerData(ContainerProtos.ContainerType type, long id) {
+ super(type, id);
+ this.numPendingDeletionBlocks = 0;
+ }
+
+ /**
+ * Constructs KeyValueContainerData object.
+ * @param type - containerType
+ * @param id - ContainerId
+ * @param layOutVersion
+ */
+ public KeyValueContainerData(ContainerProtos.ContainerType type, long id,
+ int layOutVersion) {
+ super(type, id, layOutVersion);
+ this.numPendingDeletionBlocks = 0;
+ }
+
+
+ /**
+ * Sets Container dbFile. This should be called only during creation of
+ * KeyValue container.
+ * @param containerDbFile
+ */
+ public void setDbFile(File containerDbFile) {
+ dbFile = containerDbFile;
+ }
+
+ /**
+ * Returns container DB file.
+ * @return dbFile
+ */
+ public File getDbFile() {
+ return dbFile;
+ }
+ /**
+ * Returns container metadata path.
+ *
+ * @return - path
+ */
+ public String getMetadataPath() {
+ return metadataPath;
+ }
+
+ /**
+ * Sets container metadata path.
+ *
+ * @param path - String.
+ */
+ public void setMetadataPath(String path) {
+ this.metadataPath = path;
+ }
+
+ /**
+ * Get chunks path.
+ * @return - Physical path where container file and checksum is stored.
+ */
+ public String getChunksPath() {
+ return chunksPath;
+ }
+
+ /**
+ * Set chunks Path.
+ * @param chunkPath - File path.
+ */
+ public void setChunksPath(String chunkPath) {
+ this.chunksPath = chunkPath;
+ }
+
+ /**
+ * Returns the DBType used for the container.
+ * @return containerDBType
+ */
+ public String getContainerDBType() {
+ return containerDBType;
+ }
+
+ /**
+ * Sets the DBType used for the container.
+ * @param containerDBType
+ */
+ public void setContainerDBType(String containerDBType) {
+ this.containerDBType = containerDBType;
+ }
+
+ /**
+ * Returns the number of pending deletion blocks in container.
+ * @return numPendingDeletionBlocks
+ */
+ public int getNumPendingDeletionBlocks() {
+ return numPendingDeletionBlocks;
+ }
+
+
+ /**
+ * Increase the count of pending deletion blocks.
+ *
+ * @param numBlocks increment number
+ */
+ public void incrPendingDeletionBlocks(int numBlocks) {
+ this.numPendingDeletionBlocks += numBlocks;
+ }
+
+ /**
+ * Decrease the count of pending deletion blocks.
+ *
+ * @param numBlocks decrement number
+ */
+ public void decrPendingDeletionBlocks(int numBlocks) {
+ this.numPendingDeletionBlocks -= numBlocks;
+ }
+
+ /**
+ * Returns a ProtoBuf Message from ContainerData.
+ *
+ * @return Protocol Buffer Message
+ */
+ public ContainerProtos.ContainerData getProtoBufMessage() {
+ ContainerProtos.ContainerData.Builder builder = ContainerProtos
+ .ContainerData.newBuilder();
+ builder.setContainerID(this.getContainerId());
+ builder.setDbPath(this.getDbFile().getPath());
+ builder.setContainerPath(this.getMetadataPath());
+ builder.setState(this.getState());
+
+ for (Map.Entry<String, String> entry : getMetadata().entrySet()) {
+ ContainerProtos.KeyValue.Builder keyValBuilder =
+ ContainerProtos.KeyValue.newBuilder();
+ builder.addMetadata(keyValBuilder.setKey(entry.getKey())
+ .setValue(entry.getValue()).build());
+ }
+
+ if (this.getBytesUsed() >= 0) {
+ builder.setBytesUsed(this.getBytesUsed());
+ }
+
+ if(this.getContainerType() != null) {
+ builder.setContainerType(ContainerProtos.ContainerType.KeyValueContainer);
+ }
+
+ if(this.getContainerDBType() != null) {
+ builder.setContainerDBType(containerDBType);
+ }
+
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerLocationUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerLocationUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerLocationUtil.java
deleted file mode 100644
index 2c15c94..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerLocationUtil.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.keyvalue;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.common.Storage;
-
-import java.io.File;
-
-/**
- * Class which provides utility methods for container locations.
- */
-public final class KeyValueContainerLocationUtil {
-
- /* Never constructed. */
- private KeyValueContainerLocationUtil() {
-
- }
- /**
- * Returns Container Metadata Location.
- * @param baseDir
- * @param scmId
- * @param containerId
- * @return containerMetadata Path
- */
- public static File getContainerMetaDataPath(String baseDir, String scmId,
- long containerId) {
- String containerMetaDataPath = getBaseContainerLocation(baseDir, scmId,
- containerId);
- containerMetaDataPath = containerMetaDataPath + File.separator +
- OzoneConsts.CONTAINER_META_PATH;
- return new File(containerMetaDataPath);
- }
-
-
- /**
- * Returns Container Chunks Location.
- * @param baseDir
- * @param scmId
- * @param containerId
- * @return chunksPath
- */
- public static File getChunksLocationPath(String baseDir, String scmId,
- long containerId) {
- String chunksPath = getBaseContainerLocation(baseDir, scmId, containerId)
- + File.separator + OzoneConsts.STORAGE_DIR_CHUNKS;
- return new File(chunksPath);
- }
-
- /**
- * Returns base directory for specified container.
- * @param baseDir
- * @param scmId
- * @param containerId
- * @return base directory for container.
- */
- private static String getBaseContainerLocation(String baseDir, String scmId,
- long containerId) {
- Preconditions.checkNotNull(baseDir, "Base Directory cannot be null");
- Preconditions.checkNotNull(scmId, "scmUuid cannot be null");
- Preconditions.checkState(containerId >= 0,
- "Container Id cannot be negative.");
-
- String containerSubDirectory = getContainerSubDirectory(containerId);
-
- String containerMetaDataPath = baseDir + File.separator + scmId +
- File.separator + Storage.STORAGE_DIR_CURRENT + File.separator +
- containerSubDirectory + File.separator + containerId;
-
- return containerMetaDataPath;
- }
-
- /**
- * Returns subdirectory, where this container needs to be placed.
- * @param containerId
- * @return container sub directory
- */
- private static String getContainerSubDirectory(long containerId){
- int directory = (int) ((containerId >> 9) & 0xFF);
- return Storage.CONTAINER_DIR + directory;
- }
-
- /**
- * Returns containerFile.
- * @param containerMetaDataPath
- * @param containerName
- * @return .container File name
- */
- public static File getContainerFile(File containerMetaDataPath, String
- containerName) {
- Preconditions.checkNotNull(containerMetaDataPath);
- Preconditions.checkNotNull(containerName);
- return new File(containerMetaDataPath, containerName +
- OzoneConsts.CONTAINER_EXTENSION);
- }
-
- /**
- * Return containerDB File.
- * @param containerMetaDataPath
- * @param containerName
- * @return containerDB File name
- */
- public static File getContainerDBFile(File containerMetaDataPath, String
- containerName) {
- Preconditions.checkNotNull(containerMetaDataPath);
- Preconditions.checkNotNull(containerName);
- return new File(containerMetaDataPath, containerName + OzoneConsts
- .DN_CONTAINER_DB);
- }
-
- /**
- * Returns container checksum file.
- * @param containerMetaDataPath
- * @param containerName
- * @return container checksum file
- */
- public static File getContainerCheckSumFile(File containerMetaDataPath,
- String containerName) {
- Preconditions.checkNotNull(containerMetaDataPath);
- Preconditions.checkNotNull(containerName);
- return new File(containerMetaDataPath, containerName + OzoneConsts
- .CONTAINER_FILE_CHECKSUM_EXTENSION);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerUtil.java
deleted file mode 100644
index 55e2ab0..0000000
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerUtil.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.container.keyvalue;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
-
-import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
-import org.apache.hadoop.utils.MetadataStore;
-import org.apache.hadoop.utils.MetadataStoreBuilder;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-
-/**
- * Class which defines utility methods for KeyValueContainer.
- */
-
-public final class KeyValueContainerUtil {
-
- /* Never constructed. */
- private KeyValueContainerUtil() {
-
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(
- KeyValueContainerUtil.class);
-
-
- public static void verifyIsNewContainer(File containerFile) throws
- FileAlreadyExistsException {
- Preconditions.checkNotNull(containerFile, "containerFile Should not be " +
- "null");
- if (containerFile.getParentFile().exists()) {
- LOG.error("container already exists on disk. File: {}", containerFile
- .toPath());
- throw new FileAlreadyExistsException("container already exists on " +
- "disk.");
- }
- }
-
- /**
- * creates metadata path, chunks path and metadata DB for the specified
- * container.
- *
- * @param containerMetaDataPath
- * @throws IOException
- */
- public static void createContainerMetaData(File containerMetaDataPath, File
- chunksPath, File dbFile, String containerName, Configuration conf) throws
- IOException {
- Preconditions.checkNotNull(containerMetaDataPath);
- Preconditions.checkNotNull(containerName);
- Preconditions.checkNotNull(conf);
-
- if (!containerMetaDataPath.mkdirs()) {
- LOG.error("Unable to create directory for metadata storage. Path: {}",
- containerMetaDataPath);
- throw new IOException("Unable to create directory for metadata storage." +
- " Path: " + containerMetaDataPath);
- }
- MetadataStore store = MetadataStoreBuilder.newBuilder().setConf(conf)
- .setCreateIfMissing(true).setDbFile(dbFile).build();
-
- // we close since the SCM pre-creates containers.
- // we will open and put Db handle into a cache when keys are being created
- // in a container.
-
- store.close();
-
- if (!chunksPath.mkdirs()) {
- LOG.error("Unable to create chunks directory Container {}",
- chunksPath);
- //clean up container metadata path and metadata db
- FileUtils.deleteDirectory(containerMetaDataPath);
- FileUtils.deleteDirectory(containerMetaDataPath.getParentFile());
- throw new IOException("Unable to create directory for data storage." +
- " Path: " + chunksPath);
- }
- }
-
- /**
- * remove Container if it is empty.
- * <p/>
- * There are three things we need to delete.
- * <p/>
- * 1. Container file and metadata file. 2. The Level DB file 3. The path that
- * we created on the data location.
- *
- * @param containerData - Data of the container to remove.
- * @param conf - configuration of the cluster.
- * @param forceDelete - whether this container should be deleted forcibly.
- * @throws IOException
- */
- public static void removeContainer(KeyValueContainerData containerData,
- Configuration conf, boolean forceDelete)
- throws IOException {
- Preconditions.checkNotNull(containerData);
- File containerMetaDataPath = new File(containerData
- .getMetadataPath());
- File chunksPath = new File(containerData.getChunksPath());
-
- MetadataStore db = KeyUtils.getDB(containerData, conf);
-
- // If the container is not empty and cannot be deleted forcibly,
- // then throw a SCE to stop deleting.
- if(!forceDelete && !db.isEmpty()) {
- throw new StorageContainerException(
- "Container cannot be deleted because it is not empty.",
- ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY);
- }
-
- // Close the DB connection and remove the DB handler from cache
- KeyUtils.removeDB(containerData, conf);
-
- // Delete the Container MetaData path.
- FileUtils.deleteDirectory(containerMetaDataPath);
-
- //Delete the Container Chunks Path.
- FileUtils.deleteDirectory(chunksPath);
-
- //Delete Container directory
- FileUtils.deleteDirectory(containerMetaDataPath.getParentFile());
-
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
new file mode 100644
index 0000000..d9ee7fd
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -0,0 +1,643 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import com.sun.jersey.spi.resource.Singleton;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerType;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .CreateContainerRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .GetSmallFileRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .KeyValue;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .PutSmallFileRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .Type;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+ .StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.SmallFileUtils;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume
+ .RoundRobinVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils;
+import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl;
+import org.apache.hadoop.ozone.container.keyvalue.impl.KeyManagerImpl;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.KeyManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .Result.*;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .Stage;
+
+/**
+ * Handler for KeyValue Container type.
+ */
+@Singleton
+public class KeyValueHandler extends Handler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ KeyValueHandler.class);
+
+ private static volatile KeyValueHandler INSTANCE = null; // Singleton class
+
+ private final ContainerType containerType;
+ private final KeyManager keyManager;
+ private final ChunkManager chunkManager;
+ private VolumeChoosingPolicy volumeChoosingPolicy;
+
+ // TODO : Add metrics and populate it.
+
+ public static KeyValueHandler getInstance(Configuration config,
+ ContainerSet contSet, VolumeSet volSet, String scmID) {
+ if (INSTANCE == null) {
+ INSTANCE = new KeyValueHandler(config, contSet, volSet, scmID);
+ }
+ return INSTANCE;
+ }
+
+ private KeyValueHandler(Configuration config, ContainerSet contSet,
+ VolumeSet volSet, String scmID) {
+ super(config, contSet, volSet, scmID);
+ containerType = ContainerType.KeyValueContainer;
+ keyManager = new KeyManagerImpl(config);
+ chunkManager = new ChunkManagerImpl();
+ // TODO: Add supoort for different volumeChoosingPolicies.
+ volumeChoosingPolicy = new RoundRobinVolumeChoosingPolicy();
+ }
+
+ @Override
+ public ContainerCommandResponseProto handle(
+ ContainerCommandRequestProto request, Container container) {
+
+ Type cmdType = request.getCmdType();
+ KeyValueContainer kvContainer = (KeyValueContainer) container;
+ switch(cmdType) {
+ case CreateContainer:
+ return handleCreateContainer(request, kvContainer);
+ case ReadContainer:
+ return handleReadContainer(request, kvContainer);
+ case UpdateContainer:
+ return handleUpdateContainer(request, kvContainer);
+ case DeleteContainer:
+ return handleDeleteContainer(request, kvContainer);
+ case ListContainer:
+ return handleUnsupportedOp(request);
+ case CloseContainer:
+ return handleCloseContainer(request, kvContainer);
+ case PutKey:
+ return handlePutKey(request, kvContainer);
+ case GetKey:
+ return handleGetKey(request, kvContainer);
+ case DeleteKey:
+ return handleDeleteKey(request, kvContainer);
+ case ListKey:
+ return handleUnsupportedOp(request);
+ case ReadChunk:
+ return handleReadChunk(request, kvContainer);
+ case DeleteChunk:
+ return handleDeleteChunk(request, kvContainer);
+ case WriteChunk:
+ return handleWriteChunk(request, kvContainer);
+ case ListChunk:
+ return handleUnsupportedOp(request);
+ case CompactChunk:
+ return handleUnsupportedOp(request);
+ case PutSmallFile:
+ return handlePutSmallFile(request, kvContainer);
+ case GetSmallFile:
+ return handleGetSmallFile(request, kvContainer);
+ }
+
+ return null;
+ }
+
+ /**
+ * Handles Create Container Request. If successful, adds the container to
+ * ContainerSet.
+ */
+ ContainerCommandResponseProto handleCreateContainer(
+ ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+ if (!request.hasCreateContainer()) {
+ LOG.debug("Malformed Create Container request. trace ID: {}",
+ request.getTraceID());
+ return ContainerUtils.malformedRequest(request);
+ }
+ // Create Container request should be passed a null container as the
+ // container would be created here.
+ Preconditions.checkArgument(kvContainer == null);
+
+ CreateContainerRequestProto createContainerReq =
+ request.getCreateContainer();
+ long containerID = createContainerReq.getContainerID();
+ if (createContainerReq.hasContainerType()) {
+ Preconditions.checkArgument(createContainerReq.getContainerType()
+ .equals(ContainerType.KeyValueContainer));
+ }
+
+ KeyValueContainerData newContainerData = new KeyValueContainerData(
+ containerType, containerID);
+ // TODO: Add support to add metadataList to ContainerData. Add metadata
+ // to container during creation.
+ KeyValueContainer newContainer = new KeyValueContainer(
+ newContainerData, conf);
+
+ try {
+ newContainer.create(volumeSet, volumeChoosingPolicy, scmID);
+ containerSet.addContainer(newContainer);
+ } catch (StorageContainerException ex) {
+ return ContainerUtils.logAndReturnError(LOG, ex, request);
+ }
+
+ return ContainerUtils.getSuccessResponse(request);
+ }
+
+ /**
+ * Handles Read Container Request. Returns the ContainerData as response.
+ */
+ ContainerCommandResponseProto handleReadContainer(
+ ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+ if (!request.hasReadContainer()) {
+ LOG.debug("Malformed Read Container request. trace ID: {}",
+ request.getTraceID());
+ return ContainerUtils.malformedRequest(request);
+ }
+
+ KeyValueContainerData containerData = kvContainer.getContainerData();
+ return KeyValueContainerUtil.getReadContainerResponse(
+ request, containerData);
+ }
+
+
+ /**
+ * Handles Update Container Request. If successful, the container metadata
+ * is updated.
+ */
+ ContainerCommandResponseProto handleUpdateContainer(
+ ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+ if (!request.hasUpdateContainer()) {
+ LOG.debug("Malformed Update Container request. trace ID: {}",
+ request.getTraceID());
+ return ContainerUtils.malformedRequest(request);
+ }
+
+ boolean forceUpdate = request.getUpdateContainer().getForceUpdate();
+ List<KeyValue> keyValueList =
+ request.getUpdateContainer().getMetadataList();
+ Map<String, String> metadata = new HashMap<>();
+ for (KeyValue keyValue : keyValueList) {
+ metadata.put(keyValue.getKey(), keyValue.getValue());
+ }
+
+ try {
+ if (!metadata.isEmpty()) {
+ kvContainer.update(metadata, forceUpdate);
+ }
+ } catch (StorageContainerException ex) {
+ return ContainerUtils.logAndReturnError(LOG, ex, request);
+ }
+ return ContainerUtils.getSuccessResponse(request);
+ }
+
+ /**
+ * Handles Delete Container Request.
+ * Open containers cannot be deleted.
+ * Holds writeLock on ContainerSet till the container is removed from
+ * containerMap. On disk deletion of container files will happen
+ * asynchornously without the lock.
+ */
+ ContainerCommandResponseProto handleDeleteContainer(
+ ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+ if (!request.hasDeleteContainer()) {
+ LOG.debug("Malformed Delete container request. trace ID: {}",
+ request.getTraceID());
+ return ContainerUtils.malformedRequest(request);
+ }
+
+ boolean forceDelete = request.getDeleteContainer().getForceDelete();
+ kvContainer.writeLock();
+
+ try {
+ // Check if container is open
+ if (kvContainer.getContainerData().isOpen()) {
+ kvContainer.writeUnlock();
+ throw new StorageContainerException(
+ "Deletion of Open Container is not allowed.",
+ DELETE_ON_OPEN_CONTAINER);
+ } else {
+ containerSet.removeContainer(
+ kvContainer.getContainerData().getContainerId());
+ // Release the lock first.
+ // Avoid holding write locks for disk operations
+ kvContainer.writeUnlock();
+
+ kvContainer.delete(forceDelete);
+ }
+ } catch (StorageContainerException ex) {
+ return ContainerUtils.logAndReturnError(LOG, ex, request);
+ } finally {
+ if (kvContainer.hasWriteLock()) {
+ kvContainer.writeUnlock();
+ }
+ }
+ return ContainerUtils.getSuccessResponse(request);
+ }
+
+ /**
+ * Handles Close Container Request. An open container is closed.
+ */
+ ContainerCommandResponseProto handleCloseContainer(
+ ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+ if (!request.hasCloseContainer()) {
+ LOG.debug("Malformed Update Container request. trace ID: {}",
+ request.getTraceID());
+ return ContainerUtils.malformedRequest(request);
+ }
+
+ try {
+ checkContainerOpen(kvContainer);
+
+ kvContainer.close();
+ } catch (StorageContainerException ex) {
+ return ContainerUtils.logAndReturnError(LOG, ex, request);
+ }
+
+ return ContainerUtils.getSuccessResponse(request);
+ }
+
+ /**
+ * Handle Put Key operation. Calls KeyManager to process the request.
+ */
+ ContainerCommandResponseProto handlePutKey(
+ ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+ if (!request.hasPutKey()) {
+ LOG.debug("Malformed Put Key request. trace ID: {}",
+ request.getTraceID());
+ return ContainerUtils.malformedRequest(request);
+ }
+
+ try {
+ checkContainerOpen(kvContainer);
+
+ KeyData keyData = KeyData.getFromProtoBuf(
+ request.getPutKey().getKeyData());
+ Preconditions.checkNotNull(keyData);
+
+ keyManager.putKey(kvContainer, keyData);
+ } catch (StorageContainerException ex) {
+ return ContainerUtils.logAndReturnError(LOG, ex, request);
+ } catch (IOException ex) {
+ return ContainerUtils.logAndReturnError(LOG,
+ new StorageContainerException("Put Key failed", ex, IO_EXCEPTION),
+ request);
+ }
+
+ return KeyUtils.getKeyResponseSuccess(request);
+ }
+
+ /**
+ * Handle Get Key operation. Calls KeyManager to process the request.
+ */
+ ContainerCommandResponseProto handleGetKey(
+ ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+ if (!request.hasGetKey()) {
+ LOG.debug("Malformed Get Key request. trace ID: {}",
+ request.getTraceID());
+ return ContainerUtils.malformedRequest(request);
+ }
+
+ KeyData responseData;
+ try {
+ BlockID blockID = BlockID.getFromProtobuf(
+ request.getGetKey().getBlockID());
+ responseData = keyManager.getKey(kvContainer, blockID);
+
+ } catch (StorageContainerException ex) {
+ return ContainerUtils.logAndReturnError(LOG, ex, request);
+ } catch (IOException ex) {
+ return ContainerUtils.logAndReturnError(LOG,
+ new StorageContainerException("Get Key failed", ex, IO_EXCEPTION),
+ request);
+ }
+
+ return KeyUtils.getKeyDataResponse(request, responseData);
+ }
+
+ /**
+ * Handle Delete Key operation. Calls KeyManager to process the request.
+ */
+ ContainerCommandResponseProto handleDeleteKey(
+ ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+ if (!request.hasDeleteKey()) {
+ LOG.debug("Malformed Delete Key request. trace ID: {}",
+ request.getTraceID());
+ return ContainerUtils.malformedRequest(request);
+ }
+
+ try {
+ checkContainerOpen(kvContainer);
+
+ BlockID blockID = BlockID.getFromProtobuf(
+ request.getDeleteKey().getBlockID());
+
+ keyManager.deleteKey(kvContainer, blockID);
+ } catch (StorageContainerException ex) {
+ return ContainerUtils.logAndReturnError(LOG, ex, request);
+ } catch (IOException ex) {
+ return ContainerUtils.logAndReturnError(LOG,
+ new StorageContainerException("Delete Key failed", ex, IO_EXCEPTION),
+ request);
+ }
+
+ return KeyUtils.getKeyResponseSuccess(request);
+ }
+
+ /**
+ * Handle Read Chunk operation. Calls ChunkManager to process the request.
+ */
+ ContainerCommandResponseProto handleReadChunk(
+ ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+ if (!request.hasReadChunk()) {
+ LOG.debug("Malformed Read Chunk request. trace ID: {}",
+ request.getTraceID());
+ return ContainerUtils.malformedRequest(request);
+ }
+
+ ChunkInfo chunkInfo;
+ byte[] data;
+ try {
+ BlockID blockID = BlockID.getFromProtobuf(
+ request.getReadChunk().getBlockID());
+ chunkInfo = ChunkInfo.getFromProtoBuf(request.getReadChunk()
+ .getChunkData());
+ Preconditions.checkNotNull(chunkInfo);
+
+ data = chunkManager.readChunk(kvContainer, blockID, chunkInfo);
+ } catch (StorageContainerException ex) {
+ return ContainerUtils.logAndReturnError(LOG, ex, request);
+ } catch (IOException ex) {
+ return ContainerUtils.logAndReturnError(LOG,
+ new StorageContainerException("Read Chunk failed", ex, IO_EXCEPTION),
+ request);
+ }
+
+ return ChunkUtils.getReadChunkResponse(request, data, chunkInfo);
+ }
+
+ /**
+ * Handle Delete Chunk operation. Calls ChunkManager to process the request.
+ */
+ ContainerCommandResponseProto handleDeleteChunk(
+ ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+ if (!request.hasDeleteChunk()) {
+ LOG.debug("Malformed Delete Chunk request. trace ID: {}",
+ request.getTraceID());
+ return ContainerUtils.malformedRequest(request);
+ }
+
+ try {
+ checkContainerOpen(kvContainer);
+
+ BlockID blockID = BlockID.getFromProtobuf(
+ request.getDeleteChunk().getBlockID());
+ ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(request.getDeleteChunk()
+ .getChunkData());
+ Preconditions.checkNotNull(chunkInfo);
+
+ chunkManager.deleteChunk(kvContainer, blockID, chunkInfo);
+ } catch (StorageContainerException ex) {
+ return ContainerUtils.logAndReturnError(LOG, ex, request);
+ } catch (IOException ex) {
+ return ContainerUtils.logAndReturnError(LOG,
+ new StorageContainerException("Delete Chunk failed", ex,
+ IO_EXCEPTION), request);
+ }
+
+ return ChunkUtils.getChunkResponseSuccess(request);
+ }
+
+ /**
+ * Handle Write Chunk operation. Calls ChunkManager to process the request.
+ */
+ ContainerCommandResponseProto handleWriteChunk(
+ ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+ if (!request.hasWriteChunk()) {
+ LOG.debug("Malformed Write Chunk request. trace ID: {}",
+ request.getTraceID());
+ return ContainerUtils.malformedRequest(request);
+ }
+
+ try {
+ checkContainerOpen(kvContainer);
+
+ BlockID blockID = BlockID.getFromProtobuf(
+ request.getWriteChunk().getBlockID());
+ ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(request.getWriteChunk()
+ .getChunkData());
+ Preconditions.checkNotNull(chunkInfo);
+
+ byte[] data = null;
+ if (request.getWriteChunk().getStage() == Stage.WRITE_DATA ||
+ request.getWriteChunk().getStage() == Stage.COMBINED) {
+ data = request.getWriteChunk().getData().toByteArray();
+ }
+
+ chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data,
+ request.getWriteChunk().getStage());
+ } catch (StorageContainerException ex) {
+ return ContainerUtils.logAndReturnError(LOG, ex, request);
+ } catch (IOException ex) {
+ return ContainerUtils.logAndReturnError(LOG,
+ new StorageContainerException("Write Chunk failed", ex, IO_EXCEPTION),
+ request);
+ }
+
+ return ChunkUtils.getChunkResponseSuccess(request);
+ }
+
+ /**
+ * Handle Put Small File operation. Writes the chunk and associated key
+ * using a single RPC. Calls KeyManager and ChunkManager to process the
+ * request.
+ */
+ ContainerCommandResponseProto handlePutSmallFile(
+ ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+ if (!request.hasPutSmallFile()) {
+ LOG.debug("Malformed Put Small File request. trace ID: {}",
+ request.getTraceID());
+ return ContainerUtils.malformedRequest(request);
+ }
+ PutSmallFileRequestProto putSmallFileReq =
+ request.getPutSmallFile();
+
+ try {
+ checkContainerOpen(kvContainer);
+
+ BlockID blockID = BlockID.getFromProtobuf(
+ putSmallFileReq.getKey().getKeyData().getBlockID());
+ KeyData keyData = KeyData.getFromProtoBuf(
+ putSmallFileReq.getKey().getKeyData());
+ Preconditions.checkNotNull(keyData);
+
+ ChunkInfo chunkInfo = ChunkInfo.getFromProtoBuf(
+ putSmallFileReq.getChunkInfo());
+ Preconditions.checkNotNull(chunkInfo);
+
+ byte[] data = putSmallFileReq.getData().toByteArray();
+ chunkManager.writeChunk(
+ kvContainer, blockID, chunkInfo, data, Stage.COMBINED);
+
+ List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
+ chunks.add(chunkInfo.getProtoBufMessage());
+ keyData.setChunks(chunks);
+ keyManager.putKey(kvContainer, keyData);
+
+ } catch (StorageContainerException ex) {
+ return ContainerUtils.logAndReturnError(LOG, ex, request);
+ } catch (IOException ex) {
+ return ContainerUtils.logAndReturnError(LOG,
+ new StorageContainerException("Read Chunk failed", ex,
+ PUT_SMALL_FILE_ERROR), request);
+ }
+
+ return SmallFileUtils.getPutFileResponseSuccess(request);
+ }
+
+ /**
+ * Handle Get Small File operation. Gets a data stream using a key. This
+ * helps in reducing the RPC overhead for small files. Calls KeyManager and
+ * ChunkManager to process the request.
+ */
+ ContainerCommandResponseProto handleGetSmallFile(
+ ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+ if (!request.hasGetSmallFile()) {
+ LOG.debug("Malformed Get Small File request. trace ID: {}",
+ request.getTraceID());
+ return ContainerUtils.malformedRequest(request);
+ }
+
+ GetSmallFileRequestProto getSmallFileReq = request.getGetSmallFile();
+
+ try {
+ BlockID blockID = BlockID.getFromProtobuf(
+ getSmallFileReq.getKey().getBlockID());
+ KeyData responseData = keyManager.getKey(kvContainer, blockID);
+
+ ContainerProtos.ChunkInfo chunkInfo = null;
+ ByteString dataBuf = ByteString.EMPTY;
+ for (ContainerProtos.ChunkInfo chunk : responseData.getChunks()) {
+ byte[] data = chunkManager.readChunk(kvContainer, blockID,
+ ChunkInfo.getFromProtoBuf(chunk));
+ ByteString current = ByteString.copyFrom(data);
+ dataBuf = dataBuf.concat(current);
+ chunkInfo = chunk;
+ }
+
+ return SmallFileUtils.getGetSmallFileResponseSuccess(request, dataBuf
+ .toByteArray(), ChunkInfo.getFromProtoBuf(chunkInfo));
+ } catch (StorageContainerException e) {
+ return ContainerUtils.logAndReturnError(LOG, e, request);
+ } catch (IOException ex) {
+ return ContainerUtils.logAndReturnError(LOG,
+ new StorageContainerException("Write Chunk failed", ex,
+ GET_SMALL_FILE_ERROR), request);
+ }
+ }
+
+ /**
+ * Handle unsupported operation.
+ */
+ ContainerCommandResponseProto handleUnsupportedOp(
+ ContainerCommandRequestProto request) {
+ // TODO : remove all unsupported operations or handle them.
+ return ContainerUtils.unsupportedRequest(request);
+ }
+
+ /**
+ * Check if container is open. Throw exception otherwise.
+ * @param kvContainer
+ * @throws StorageContainerException
+ */
+ private void checkContainerOpen(KeyValueContainer kvContainer)
+ throws StorageContainerException {
+
+ ContainerProtos.ContainerLifeCycleState containerState =
+ kvContainer.getContainerState();
+
+ if (containerState == ContainerProtos.ContainerLifeCycleState.OPEN) {
+ return;
+ } else {
+ String msg = "Requested operation not allowed as ContainerState is " +
+ containerState;
+ ContainerProtos.Result result = null;
+ switch (containerState) {
+ case CLOSING:
+ case CLOSED:
+ result = CLOSED_CONTAINER_IO;
+ break;
+ case INVALID:
+ result = INVALID_CONTAINER_STATE;
+ break;
+ default:
+ result = CONTAINER_INTERNAL_ERROR;
+ }
+
+ throw new StorageContainerException(msg, result);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueYaml.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueYaml.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueYaml.java
new file mode 100644
index 0000000..64f7152
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueYaml.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.keyvalue;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.yaml.snakeyaml.Yaml;
+
+import java.beans.IntrospectionException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Writer;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.OutputStreamWriter;
+import java.io.File;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.Map;
+
+import org.yaml.snakeyaml.constructor.AbstractConstruct;
+import org.yaml.snakeyaml.constructor.Constructor;
+import org.yaml.snakeyaml.introspector.BeanAccess;
+import org.yaml.snakeyaml.introspector.Property;
+import org.yaml.snakeyaml.introspector.PropertyUtils;
+import org.yaml.snakeyaml.nodes.MappingNode;
+import org.yaml.snakeyaml.nodes.Node;
+import org.yaml.snakeyaml.nodes.ScalarNode;
+import org.yaml.snakeyaml.nodes.Tag;
+import org.yaml.snakeyaml.representer.Representer;
+
+/**
+ * Class for creating and reading .container files.
+ */
+
+public final class KeyValueYaml {
+
+ private KeyValueYaml() {
+
+ }
+ /**
+ * Creates a .container file in yaml format.
+ *
+ * @param containerFile
+ * @param containerData
+ * @throws IOException
+ */
+ public static void createContainerFile(File containerFile, ContainerData
+ containerData) throws IOException {
+
+ Preconditions.checkNotNull(containerFile, "yamlFile cannot be null");
+ Preconditions.checkNotNull(containerData, "containerData cannot be null");
+
+ PropertyUtils propertyUtils = new PropertyUtils();
+ propertyUtils.setBeanAccess(BeanAccess.FIELD);
+ propertyUtils.setAllowReadOnlyProperties(true);
+
+ Representer representer = new KeyValueContainerDataRepresenter();
+ representer.setPropertyUtils(propertyUtils);
+ representer.addClassTag(
+ KeyValueContainerData.class, new Tag("KeyValueContainerData"));
+
+ Constructor keyValueDataConstructor = new KeyValueDataConstructor();
+
+ Yaml yaml = new Yaml(keyValueDataConstructor, representer);
+
+ Writer writer = new OutputStreamWriter(new FileOutputStream(containerFile),
+ "UTF-8");
+ yaml.dump(containerData, writer);
+ writer.close();
+ }
+
+ /**
+ * Read the yaml file, and return containerData.
+ *
+ * @param containerFile
+ * @throws IOException
+ */
+ public static KeyValueContainerData readContainerFile(File containerFile)
+ throws IOException {
+ Preconditions.checkNotNull(containerFile, "containerFile cannot be null");
+
+ InputStream input = null;
+ KeyValueContainerData keyValueContainerData;
+ try {
+ PropertyUtils propertyUtils = new PropertyUtils();
+ propertyUtils.setBeanAccess(BeanAccess.FIELD);
+ propertyUtils.setAllowReadOnlyProperties(true);
+
+ Representer representer = new KeyValueContainerDataRepresenter();
+ representer.setPropertyUtils(propertyUtils);
+ representer.addClassTag(
+ KeyValueContainerData.class, new Tag("KeyValueContainerData"));
+
+ Constructor keyValueDataConstructor = new KeyValueDataConstructor();
+
+ Yaml yaml = new Yaml(keyValueDataConstructor, representer);
+ yaml.setBeanAccess(BeanAccess.FIELD);
+
+ input = new FileInputStream(containerFile);
+ keyValueContainerData = (KeyValueContainerData)
+ yaml.load(input);
+ } finally {
+ if (input!= null) {
+ input.close();
+ }
+ }
+ return keyValueContainerData;
+ }
+
+ /**
+ * Representer class to define which fields need to be stored in yaml file.
+ */
+ private static class KeyValueContainerDataRepresenter extends Representer {
+ @Override
+ protected Set<Property> getProperties(Class<? extends Object> type)
+ throws IntrospectionException {
+ Set<Property> set = super.getProperties(type);
+ Set<Property> filtered = new TreeSet<Property>();
+ if (type.equals(KeyValueContainerData.class)) {
+ // filter properties
+ for (Property prop : set) {
+ String name = prop.getName();
+ // When a new field needs to be added, it needs to be added here.
+ if (name.equals("containerType") || name.equals("containerId") ||
+ name.equals("layOutVersion") || name.equals("state") ||
+ name.equals("metadata") || name.equals("metadataPath") ||
+ name.equals("chunksPath") || name.equals(
+ "containerDBType")) {
+ filtered.add(prop);
+ }
+ }
+ }
+ return filtered;
+ }
+ }
+
+ /**
+ * Constructor class for KeyValueData, which will be used by Yaml.
+ */
+ private static class KeyValueDataConstructor extends Constructor {
+ KeyValueDataConstructor() {
+ //Adding our own specific constructors for tags.
+ this.yamlConstructors.put(new Tag("KeyValueContainerData"),
+ new ConstructKeyValueContainerData());
+ this.yamlConstructors.put(Tag.INT, new ConstructLong());
+ }
+
+ private class ConstructKeyValueContainerData extends AbstractConstruct {
+ public Object construct(Node node) {
+ MappingNode mnode = (MappingNode) node;
+ Map<Object, Object> nodes = constructMapping(mnode);
+ String type = (String) nodes.get("containerType");
+
+ ContainerProtos.ContainerType containerType = ContainerProtos
+ .ContainerType.KeyValueContainer;
+ if (type.equals("KeyValueContainer")) {
+ containerType = ContainerProtos.ContainerType.KeyValueContainer;
+ }
+
+ //Needed this, as TAG.INT type is by default converted to Long.
+ long layOutVersion = (long) nodes.get("layOutVersion");
+ int lv = (int) layOutVersion;
+
+ //When a new field is added, it needs to be added here.
+ KeyValueContainerData kvData = new KeyValueContainerData(containerType,
+ (long) nodes.get("containerId"), lv);
+ kvData.setContainerDBType((String)nodes.get("containerDBType"));
+ kvData.setMetadataPath((String) nodes.get(
+ "metadataPath"));
+ kvData.setChunksPath((String) nodes.get("chunksPath"));
+ Map<String, String> meta = (Map) nodes.get("metadata");
+ meta.forEach((key, val) -> {
+ try {
+ kvData.addMetadata(key, val);
+ } catch (IOException e) {
+ throw new IllegalStateException("Unexpected " +
+ "Key Value Pair " + "(" + key + "," + val +")in the metadata " +
+ "for containerId " + (long) nodes.get("containerId"));
+ }
+ });
+ String state = (String) nodes.get("state");
+ switch (state) {
+ case "OPEN":
+ kvData.setState(ContainerProtos.ContainerLifeCycleState.OPEN);
+ break;
+ case "CLOSING":
+ kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSING);
+ break;
+ case "CLOSED":
+ kvData.setState(ContainerProtos.ContainerLifeCycleState.CLOSED);
+ break;
+ default:
+ throw new IllegalStateException("Unexpected " +
+ "ContainerLifeCycleState " + state + " for the containerId " +
+ (long) nodes.get("containerId"));
+ }
+ return kvData;
+ }
+ }
+
+ //Below code is taken from snake yaml, as snakeyaml tries to fit the
+ // number if it fits in integer, otherwise returns long. So, slightly
+ // modified the code to return long in all cases.
+ private class ConstructLong extends AbstractConstruct {
+ public Object construct(Node node) {
+ String value = constructScalar((ScalarNode) node).toString()
+ .replaceAll("_", "");
+ int sign = +1;
+ char first = value.charAt(0);
+ if (first == '-') {
+ sign = -1;
+ value = value.substring(1);
+ } else if (first == '+') {
+ value = value.substring(1);
+ }
+ int base = 10;
+ if ("0".equals(value)) {
+ return Long.valueOf(0);
+ } else if (value.startsWith("0b")) {
+ value = value.substring(2);
+ base = 2;
+ } else if (value.startsWith("0x")) {
+ value = value.substring(2);
+ base = 16;
+ } else if (value.startsWith("0")) {
+ value = value.substring(1);
+ base = 8;
+ } else if (value.indexOf(':') != -1) {
+ String[] digits = value.split(":");
+ int bes = 1;
+ int val = 0;
+ for (int i = 0, j = digits.length; i < j; i++) {
+ val += (Long.parseLong(digits[(j - i) - 1]) * bes);
+ bes *= 60;
+ }
+ return createNumber(sign, String.valueOf(val), 10);
+ } else {
+ return createNumber(sign, value, 10);
+ }
+ return createNumber(sign, value, base);
+ }
+ }
+
+ private Number createNumber(int sign, String number, int radix) {
+ Number result;
+ if (sign < 0) {
+ number = "-" + number;
+ }
+ result = Long.valueOf(number, radix);
+ return result;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
index c837ccc..872d84d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
@@ -21,12 +21,21 @@ package org.apache.hadoop.ozone.container.keyvalue.helpers;
import com.google.common.base.Preconditions;
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ReadChunkResponseProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+ .StorageContainerException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -292,4 +301,41 @@ public final class ChunkUtils {
(Boolean.valueOf(overWrite));
}
+ /**
+ * Returns a CreateContainer Response. This call is used by create and delete
+ * containers which have null success responses.
+ *
+ * @param msg Request
+ * @return Response.
+ */
+ public static ContainerCommandResponseProto getChunkResponseSuccess(
+ ContainerCommandRequestProto msg) {
+ return ContainerUtils.getSuccessResponse(msg);
+ }
+
+ /**
+ * Gets a response to the read chunk calls.
+ *
+ * @param msg - Msg
+ * @param data - Data
+ * @param info - Info
+ * @return Response.
+ */
+ public static ContainerCommandResponseProto getReadChunkResponse(
+ ContainerCommandRequestProto msg, byte[] data, ChunkInfo info) {
+ Preconditions.checkNotNull(msg);
+ Preconditions.checkNotNull("Chunk data is null", data);
+ Preconditions.checkNotNull("Chunk Info is null", info);
+
+ ReadChunkResponseProto.Builder response =
+ ReadChunkResponseProto.newBuilder();
+ response.setChunkData(info.getProtoBufMessage());
+ response.setData(ByteString.copyFrom(data));
+ response.setBlockID(msg.getReadChunk().getBlockID());
+
+ ContainerCommandResponseProto.Builder builder =
+ ContainerUtils.getSuccessResponseBuilder(msg);
+ builder.setReadChunk(response);
+ return builder.build();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
index d45f598..714f445 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyUtils.java
@@ -21,9 +21,17 @@ package org.apache.hadoop.ozone.container.keyvalue.helpers;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .GetKeyResponseProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+ .StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.helpers.KeyData;
-import org.apache.hadoop.ozone.container.common.impl.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
import org.apache.hadoop.utils.MetadataStore;
@@ -112,4 +120,27 @@ public final class KeyUtils {
" bytes array.", NO_SUCH_KEY);
}
}
+
+ /**
+ * Returns successful keyResponse.
+ * @param msg - Request.
+ * @return Response.
+ */
+ public static ContainerCommandResponseProto getKeyResponseSuccess(
+ ContainerCommandRequestProto msg) {
+ return ContainerUtils.getSuccessResponse(msg);
+ }
+
+
+ public static ContainerCommandResponseProto getKeyDataResponse(
+ ContainerCommandRequestProto msg, KeyData data) {
+ GetKeyResponseProto.Builder getKey = ContainerProtos
+ .GetKeyResponseProto
+ .newBuilder();
+ getKey.setKeyData(data.getProtoBufMessage());
+ ContainerProtos.ContainerCommandResponseProto.Builder builder =
+ ContainerUtils.getSuccessResponseBuilder(msg);
+ builder.setGetKey(getKey);
+ return builder.build();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerLocationUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerLocationUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerLocationUtil.java
new file mode 100644
index 0000000..4710c51
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerLocationUtil.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.keyvalue.helpers;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.Storage;
+
+import java.io.File;
+
+/**
+ * Class which provides utility methods for container locations.
+ */
+public final class KeyValueContainerLocationUtil {
+
+ /* Never constructed. */
+ private KeyValueContainerLocationUtil() {
+
+ }
+ /**
+ * Returns Container Metadata Location.
+ * @param baseDir
+ * @param scmId
+ * @param containerId
+ * @return containerMetadata Path
+ */
+ public static File getContainerMetaDataPath(String baseDir, String scmId,
+ long containerId) {
+ String containerMetaDataPath = getBaseContainerLocation(baseDir, scmId,
+ containerId);
+ containerMetaDataPath = containerMetaDataPath + File.separator +
+ OzoneConsts.CONTAINER_META_PATH;
+ return new File(containerMetaDataPath);
+ }
+
+
+ /**
+ * Returns Container Chunks Location.
+ * @param baseDir
+ * @param scmId
+ * @param containerId
+ * @return chunksPath
+ */
+ public static File getChunksLocationPath(String baseDir, String scmId,
+ long containerId) {
+ String chunksPath = getBaseContainerLocation(baseDir, scmId, containerId)
+ + File.separator + OzoneConsts.STORAGE_DIR_CHUNKS;
+ return new File(chunksPath);
+ }
+
+ /**
+ * Returns base directory for specified container.
+ * @param baseDir
+ * @param scmId
+ * @param containerId
+ * @return base directory for container.
+ */
+ private static String getBaseContainerLocation(String baseDir, String scmId,
+ long containerId) {
+ Preconditions.checkNotNull(baseDir, "Base Directory cannot be null");
+ Preconditions.checkNotNull(scmId, "scmUuid cannot be null");
+ Preconditions.checkState(containerId >= 0,
+ "Container Id cannot be negative.");
+
+ String containerSubDirectory = getContainerSubDirectory(containerId);
+
+ String containerMetaDataPath = baseDir + File.separator + scmId +
+ File.separator + Storage.STORAGE_DIR_CURRENT + File.separator +
+ containerSubDirectory + File.separator + containerId;
+
+ return containerMetaDataPath;
+ }
+
+ /**
+ * Returns subdirectory, where this container needs to be placed.
+ * @param containerId
+ * @return container sub directory
+ */
+ private static String getContainerSubDirectory(long containerId){
+ int directory = (int) ((containerId >> 9) & 0xFF);
+ return Storage.CONTAINER_DIR + directory;
+ }
+
+ /**
+ * Returns containerFile.
+ * @param containerMetaDataPath
+ * @param containerName
+ * @return .container File name
+ */
+ public static File getContainerFile(File containerMetaDataPath, String
+ containerName) {
+ Preconditions.checkNotNull(containerMetaDataPath);
+ Preconditions.checkNotNull(containerName);
+ return new File(containerMetaDataPath, containerName +
+ OzoneConsts.CONTAINER_EXTENSION);
+ }
+
+ /**
+ * Return containerDB File.
+ * @param containerMetaDataPath
+ * @param containerName
+ * @return containerDB File name
+ */
+ public static File getContainerDBFile(File containerMetaDataPath, String
+ containerName) {
+ Preconditions.checkNotNull(containerMetaDataPath);
+ Preconditions.checkNotNull(containerName);
+ return new File(containerMetaDataPath, containerName + OzoneConsts
+ .DN_CONTAINER_DB);
+ }
+
+ /**
+ * Returns container checksum file.
+ * @param containerMetaDataPath
+ * @param containerName
+ * @return container checksum file
+ */
+ public static File getContainerCheckSumFile(File containerMetaDataPath,
+ String containerName) {
+ Preconditions.checkNotNull(containerMetaDataPath);
+ Preconditions.checkNotNull(containerName);
+ return new File(containerMetaDataPath, containerName + OzoneConsts
+ .CONTAINER_FILE_CHECKSUM_EXTENSION);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
new file mode 100644
index 0000000..b868f1d
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.keyvalue.helpers;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+ .StorageContainerException;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Class which defines utility methods for KeyValueContainer.
+ */
+
+public final class KeyValueContainerUtil {
+
+ /* Never constructed. */
+ private KeyValueContainerUtil() {
+
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ KeyValueContainerUtil.class);
+
+
+ public static void verifyIsNewContainer(File containerFile) throws
+ FileAlreadyExistsException {
+ Preconditions.checkNotNull(containerFile, "containerFile Should not be " +
+ "null");
+ if (containerFile.getParentFile().exists()) {
+ LOG.error("container already exists on disk. File: {}", containerFile
+ .toPath());
+ throw new FileAlreadyExistsException("container already exists on " +
+ "disk.");
+ }
+ }
+
+ /**
+ * creates metadata path, chunks path and metadata DB for the specified
+ * container.
+ *
+ * @param containerMetaDataPath
+ * @throws IOException
+ */
+ public static void createContainerMetaData(File containerMetaDataPath, File
+ chunksPath, File dbFile, String containerName, Configuration conf) throws
+ IOException {
+ Preconditions.checkNotNull(containerMetaDataPath);
+ Preconditions.checkNotNull(containerName);
+ Preconditions.checkNotNull(conf);
+
+ if (!containerMetaDataPath.mkdirs()) {
+ LOG.error("Unable to create directory for metadata storage. Path: {}",
+ containerMetaDataPath);
+ throw new IOException("Unable to create directory for metadata storage." +
+ " Path: " + containerMetaDataPath);
+ }
+ MetadataStore store = MetadataStoreBuilder.newBuilder().setConf(conf)
+ .setCreateIfMissing(true).setDbFile(dbFile).build();
+
+ // we close since the SCM pre-creates containers.
+ // we will open and put Db handle into a cache when keys are being created
+ // in a container.
+
+ store.close();
+
+ if (!chunksPath.mkdirs()) {
+ LOG.error("Unable to create chunks directory Container {}",
+ chunksPath);
+ //clean up container metadata path and metadata db
+ FileUtils.deleteDirectory(containerMetaDataPath);
+ FileUtils.deleteDirectory(containerMetaDataPath.getParentFile());
+ throw new IOException("Unable to create directory for data storage." +
+ " Path: " + chunksPath);
+ }
+ }
+
+ /**
+ * remove Container if it is empty.
+ * <p/>
+ * There are three things we need to delete.
+ * <p/>
+ * 1. Container file and metadata file. 2. The Level DB file 3. The path that
+ * we created on the data location.
+ *
+ * @param containerData - Data of the container to remove.
+ * @param conf - configuration of the cluster.
+ * @param forceDelete - whether this container should be deleted forcibly.
+ * @throws IOException
+ */
+ public static void removeContainer(KeyValueContainerData containerData,
+ Configuration conf, boolean forceDelete)
+ throws IOException {
+ Preconditions.checkNotNull(containerData);
+ File containerMetaDataPath = new File(containerData
+ .getMetadataPath());
+ File chunksPath = new File(containerData.getChunksPath());
+
+ MetadataStore db = KeyUtils.getDB(containerData, conf);
+
+ // If the container is not empty and cannot be deleted forcibly,
+ // then throw a SCE to stop deleting.
+ if(!forceDelete && !db.isEmpty()) {
+ throw new StorageContainerException(
+ "Container cannot be deleted because it is not empty.",
+ ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY);
+ }
+
+ // Close the DB connection and remove the DB handler from cache
+ KeyUtils.removeDB(containerData, conf);
+
+ // Delete the Container MetaData path.
+ FileUtils.deleteDirectory(containerMetaDataPath);
+
+ //Delete the Container Chunks Path.
+ FileUtils.deleteDirectory(chunksPath);
+
+ //Delete Container directory
+ FileUtils.deleteDirectory(containerMetaDataPath.getParentFile());
+ }
+
+ /**
+ * Returns a ReadContainer Response.
+ *
+ * @param request Request
+ * @param containerData - data
+ * @return Response.
+ */
+ public static ContainerCommandResponseProto getReadContainerResponse(
+ ContainerCommandRequestProto request,
+ KeyValueContainerData containerData) {
+ Preconditions.checkNotNull(containerData);
+
+ ContainerProtos.ReadContainerResponseProto.Builder response =
+ ContainerProtos.ReadContainerResponseProto.newBuilder();
+ response.setContainerData(containerData.getProtoBufMessage());
+
+ ContainerCommandResponseProto.Builder builder =
+ ContainerUtils.getSuccessResponseBuilder(request);
+ builder.setReadContainer(response);
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/13579f92/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java
new file mode 100644
index 0000000..df60c60
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.keyvalue.helpers;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandResponseProto;
+
+/**
+ * File Utils are helper routines used by putSmallFile and getSmallFile
+ * RPCs.
+ */
+public final class SmallFileUtils {
+ /**
+ * Never Constructed.
+ */
+ private SmallFileUtils() {
+ }
+
+ /**
+ * Gets a response for the putSmallFile RPC.
+ * @param msg - ContainerCommandRequestProto
+ * @return - ContainerCommandResponseProto
+ */
+ public static ContainerCommandResponseProto getPutFileResponseSuccess(
+ ContainerCommandRequestProto msg) {
+ ContainerProtos.PutSmallFileResponseProto.Builder getResponse =
+ ContainerProtos.PutSmallFileResponseProto.newBuilder();
+ ContainerCommandResponseProto.Builder builder =
+ ContainerUtils.getSuccessResponseBuilder(msg);
+ builder.setCmdType(ContainerProtos.Type.PutSmallFile);
+ builder.setPutSmallFile(getResponse);
+ return builder.build();
+ }
+
+ /**
+ * Gets a response to the read small file call.
+ * @param msg - Msg
+ * @param data - Data
+ * @param info - Info
+ * @return Response.
+ */
+ public static ContainerCommandResponseProto getGetSmallFileResponseSuccess(
+ ContainerCommandRequestProto msg, byte[] data, ChunkInfo info) {
+ Preconditions.checkNotNull(msg);
+
+ ContainerProtos.ReadChunkResponseProto.Builder readChunkresponse =
+ ContainerProtos.ReadChunkResponseProto.newBuilder();
+ readChunkresponse.setChunkData(info.getProtoBufMessage());
+ readChunkresponse.setData(ByteString.copyFrom(data));
+ readChunkresponse.setBlockID(msg.getGetSmallFile().getKey().getBlockID());
+
+ ContainerProtos.GetSmallFileResponseProto.Builder getSmallFile =
+ ContainerProtos.GetSmallFileResponseProto.newBuilder();
+ getSmallFile.setData(readChunkresponse.build());
+ ContainerCommandResponseProto.Builder builder =
+ ContainerUtils.getSuccessResponseBuilder(msg);
+ builder.setCmdType(ContainerProtos.Type.GetSmallFile);
+ builder.setGetSmallFile(getSmallFile);
+ return builder.build();
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org