You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2018/07/11 23:10:04 UTC
[04/56] [abbrv] hadoop git commit: HDDS-167. Rename KeySpaceManager
to OzoneManager. Contributed by Arpit Agarwal.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
deleted file mode 100644
index cc2f78a..0000000
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
-import org.apache.hadoop.ozone.protocol.proto
- .KeySpaceManagerProtocolProtos.OzoneAclInfo;
-import org.apache.hadoop.ozone.protocol.proto
- .KeySpaceManagerProtocolProtos.VolumeList;
-import org.apache.hadoop.ozone.protocol.proto
- .KeySpaceManagerProtocolProtos.VolumeInfo;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.utils.BatchOperation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
- .OZONE_KSM_USER_MAX_VOLUME_DEFAULT;
-import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
- .OZONE_KSM_USER_MAX_VOLUME;
-import static org.apache.hadoop.ozone.ksm.exceptions
- .KSMException.ResultCodes;
-
-/**
- * KSM volume management code.
- */
-public class VolumeManagerImpl implements VolumeManager {
- private static final Logger LOG =
- LoggerFactory.getLogger(VolumeManagerImpl.class);
-
- private final KSMMetadataManager metadataManager;
- private final int maxUserVolumeCount;
-
- /**
- * Constructor.
- * @param conf - Ozone configuration.
- * @throws IOException
- */
- public VolumeManagerImpl(KSMMetadataManager metadataManager,
- OzoneConfiguration conf) throws IOException {
- this.metadataManager = metadataManager;
- this.maxUserVolumeCount = conf.getInt(OZONE_KSM_USER_MAX_VOLUME,
- OZONE_KSM_USER_MAX_VOLUME_DEFAULT);
- }
-
- // Helpers to add and delete volume from user list
- private void addVolumeToOwnerList(String volume, String owner,
- BatchOperation batchOperation) throws IOException {
- // Get the volume list
- byte[] dbUserKey = metadataManager.getUserKey(owner);
- byte[] volumeList = metadataManager.get(dbUserKey);
- List<String> prevVolList = new LinkedList<>();
- if (volumeList != null) {
- VolumeList vlist = VolumeList.parseFrom(volumeList);
- prevVolList.addAll(vlist.getVolumeNamesList());
- }
-
- // Check the volume count
- if (prevVolList.size() >= maxUserVolumeCount) {
- LOG.debug("Too many volumes for user:{}", owner);
- throw new KSMException(ResultCodes.FAILED_TOO_MANY_USER_VOLUMES);
- }
-
- // Add the new volume to the list
- prevVolList.add(volume);
- VolumeList newVolList = VolumeList.newBuilder()
- .addAllVolumeNames(prevVolList).build();
- batchOperation.put(dbUserKey, newVolList.toByteArray());
- }
-
- private void delVolumeFromOwnerList(String volume, String owner,
- BatchOperation batchOperation)
- throws IOException {
- // Get the volume list
- byte[] dbUserKey = metadataManager.getUserKey(owner);
- byte[] volumeList = metadataManager.get(dbUserKey);
- List<String> prevVolList = new LinkedList<>();
- if (volumeList != null) {
- VolumeList vlist = VolumeList.parseFrom(volumeList);
- prevVolList.addAll(vlist.getVolumeNamesList());
- } else {
- LOG.debug("volume:{} not found for user:{}");
- throw new KSMException(ResultCodes.FAILED_USER_NOT_FOUND);
- }
-
- // Remove the volume from the list
- prevVolList.remove(volume);
- if (prevVolList.size() == 0) {
- batchOperation.delete(dbUserKey);
- } else {
- VolumeList newVolList = VolumeList.newBuilder()
- .addAllVolumeNames(prevVolList).build();
- batchOperation.put(dbUserKey, newVolList.toByteArray());
- }
- }
-
- /**
- * Creates a volume.
- * @param args - KsmVolumeArgs.
- */
- @Override
- public void createVolume(KsmVolumeArgs args) throws IOException {
- Preconditions.checkNotNull(args);
- metadataManager.writeLock().lock();
- try {
- byte[] dbVolumeKey = metadataManager.getVolumeKey(args.getVolume());
- byte[] volumeInfo = metadataManager.get(dbVolumeKey);
-
- // Check of the volume already exists
- if (volumeInfo != null) {
- LOG.debug("volume:{} already exists", args.getVolume());
- throw new KSMException(ResultCodes.FAILED_VOLUME_ALREADY_EXISTS);
- }
-
- BatchOperation batch = new BatchOperation();
- // Write the vol info
- List<HddsProtos.KeyValue> metadataList = new LinkedList<>();
- for (Map.Entry<String, String> entry : args.getKeyValueMap().entrySet()) {
- metadataList.add(HddsProtos.KeyValue.newBuilder()
- .setKey(entry.getKey()).setValue(entry.getValue()).build());
- }
- List<OzoneAclInfo> aclList = args.getAclMap().ozoneAclGetProtobuf();
-
- VolumeInfo newVolumeInfo = VolumeInfo.newBuilder()
- .setAdminName(args.getAdminName())
- .setOwnerName(args.getOwnerName())
- .setVolume(args.getVolume())
- .setQuotaInBytes(args.getQuotaInBytes())
- .addAllMetadata(metadataList)
- .addAllVolumeAcls(aclList)
- .setCreationTime(Time.now())
- .build();
- batch.put(dbVolumeKey, newVolumeInfo.toByteArray());
-
- // Add volume to user list
- addVolumeToOwnerList(args.getVolume(), args.getOwnerName(), batch);
- metadataManager.writeBatch(batch);
- LOG.debug("created volume:{} user:{}", args.getVolume(),
- args.getOwnerName());
- } catch (IOException ex) {
- if (!(ex instanceof KSMException)) {
- LOG.error("Volume creation failed for user:{} volume:{}",
- args.getOwnerName(), args.getVolume(), ex);
- }
- throw ex;
- } finally {
- metadataManager.writeLock().unlock();
- }
- }
-
- /**
- * Changes the owner of a volume.
- *
- * @param volume - Name of the volume.
- * @param owner - Name of the owner.
- * @throws IOException
- */
- @Override
- public void setOwner(String volume, String owner) throws IOException {
- Preconditions.checkNotNull(volume);
- Preconditions.checkNotNull(owner);
- metadataManager.writeLock().lock();
- try {
- byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
- byte[] volInfo = metadataManager.get(dbVolumeKey);
- if (volInfo == null) {
- LOG.debug("Changing volume ownership failed for user:{} volume:{}",
- owner, volume);
- throw new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
- }
-
- VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
- KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
- Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
-
- BatchOperation batch = new BatchOperation();
- delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), batch);
- addVolumeToOwnerList(volume, owner, batch);
-
- KsmVolumeArgs newVolumeArgs =
- KsmVolumeArgs.newBuilder().setVolume(volumeArgs.getVolume())
- .setAdminName(volumeArgs.getAdminName())
- .setOwnerName(owner)
- .setQuotaInBytes(volumeArgs.getQuotaInBytes())
- .setCreationTime(volumeArgs.getCreationTime())
- .build();
-
- VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf();
- batch.put(dbVolumeKey, newVolumeInfo.toByteArray());
-
- metadataManager.writeBatch(batch);
- } catch (IOException ex) {
- if (!(ex instanceof KSMException)) {
- LOG.error("Changing volume ownership failed for user:{} volume:{}",
- owner, volume, ex);
- }
- throw ex;
- } finally {
- metadataManager.writeLock().unlock();
- }
- }
-
- /**
- * Changes the Quota on a volume.
- *
- * @param volume - Name of the volume.
- * @param quota - Quota in bytes.
- * @throws IOException
- */
- public void setQuota(String volume, long quota) throws IOException {
- Preconditions.checkNotNull(volume);
- metadataManager.writeLock().lock();
- try {
- byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
- byte[] volInfo = metadataManager.get(dbVolumeKey);
- if (volInfo == null) {
- LOG.debug("volume:{} does not exist", volume);
- throw new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
- }
-
- VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
- KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
- Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
-
- KsmVolumeArgs newVolumeArgs =
- KsmVolumeArgs.newBuilder()
- .setVolume(volumeArgs.getVolume())
- .setAdminName(volumeArgs.getAdminName())
- .setOwnerName(volumeArgs.getOwnerName())
- .setQuotaInBytes(quota)
- .setCreationTime(volumeArgs.getCreationTime()).build();
-
- VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf();
- metadataManager.put(dbVolumeKey, newVolumeInfo.toByteArray());
- } catch (IOException ex) {
- if (!(ex instanceof KSMException)) {
- LOG.error("Changing volume quota failed for volume:{} quota:{}", volume,
- quota, ex);
- }
- throw ex;
- } finally {
- metadataManager.writeLock().unlock();
- }
- }
-
- /**
- * Gets the volume information.
- * @param volume - Volume name.
- * @return VolumeArgs or exception is thrown.
- * @throws IOException
- */
- public KsmVolumeArgs getVolumeInfo(String volume) throws IOException {
- Preconditions.checkNotNull(volume);
- metadataManager.readLock().lock();
- try {
- byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
- byte[] volInfo = metadataManager.get(dbVolumeKey);
- if (volInfo == null) {
- LOG.debug("volume:{} does not exist", volume);
- throw new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
- }
-
- VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
- KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
- Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
- return volumeArgs;
- } catch (IOException ex) {
- if (!(ex instanceof KSMException)) {
- LOG.warn("Info volume failed for volume:{}", volume, ex);
- }
- throw ex;
- } finally {
- metadataManager.readLock().unlock();
- }
- }
-
- /**
- * Deletes an existing empty volume.
- *
- * @param volume - Name of the volume.
- * @throws IOException
- */
- @Override
- public void deleteVolume(String volume) throws IOException {
- Preconditions.checkNotNull(volume);
- metadataManager.writeLock().lock();
- try {
- BatchOperation batch = new BatchOperation();
- byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
- byte[] volInfo = metadataManager.get(dbVolumeKey);
- if (volInfo == null) {
- LOG.debug("volume:{} does not exist", volume);
- throw new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
- }
-
- if (!metadataManager.isVolumeEmpty(volume)) {
- LOG.debug("volume:{} is not empty", volume);
- throw new KSMException(ResultCodes.FAILED_VOLUME_NOT_EMPTY);
- }
-
- VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
- Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
- // delete the volume from the owner list
- // as well as delete the volume entry
- delVolumeFromOwnerList(volume, volumeInfo.getOwnerName(), batch);
- batch.delete(dbVolumeKey);
- metadataManager.writeBatch(batch);
- } catch (IOException ex) {
- if (!(ex instanceof KSMException)) {
- LOG.error("Delete volume failed for volume:{}", volume, ex);
- }
- throw ex;
- } finally {
- metadataManager.writeLock().unlock();
- }
- }
-
- /**
- * Checks if the specified user with a role can access this volume.
- *
- * @param volume - volume
- * @param userAcl - user acl which needs to be checked for access
- * @return true if the user has access for the volume, false otherwise
- * @throws IOException
- */
- public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
- throws IOException {
- Preconditions.checkNotNull(volume);
- Preconditions.checkNotNull(userAcl);
- metadataManager.readLock().lock();
- try {
- byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
- byte[] volInfo = metadataManager.get(dbVolumeKey);
- if (volInfo == null) {
- LOG.debug("volume:{} does not exist", volume);
- throw new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
- }
-
- VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
- KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
- Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
- return volumeArgs.getAclMap().hasAccess(userAcl);
- } catch (IOException ex) {
- if (!(ex instanceof KSMException)) {
- LOG.error("Check volume access failed for volume:{} user:{} rights:{}",
- volume, userAcl.getName(), userAcl.getRights(), ex);
- }
- throw ex;
- } finally {
- metadataManager.readLock().unlock();
- }
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public List<KsmVolumeArgs> listVolumes(String userName,
- String prefix, String startKey, int maxKeys) throws IOException {
- metadataManager.readLock().lock();
- try {
- return metadataManager.listVolumes(
- userName, prefix, startKey, maxKeys);
- } finally {
- metadataManager.readLock().unlock();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
deleted file mode 100644
index b902eab..0000000
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.ksm.exceptions;
-
-import java.io.IOException;
-
-/**
- * Exception thrown by KSM.
- */
-public class KSMException extends IOException {
- private final KSMException.ResultCodes result;
-
- /**
- * Constructs an {@code IOException} with {@code null}
- * as its error detail message.
- */
- public KSMException(KSMException.ResultCodes result) {
- this.result = result;
- }
-
- /**
- * Constructs an {@code IOException} with the specified detail message.
- *
- * @param message The detail message (which is saved for later retrieval by
- * the
- * {@link #getMessage()} method)
- */
- public KSMException(String message, KSMException.ResultCodes result) {
- super(message);
- this.result = result;
- }
-
- /**
- * Constructs an {@code IOException} with the specified detail message
- * and cause.
- * <p>
- * <p> Note that the detail message associated with {@code cause} is
- * <i>not</i> automatically incorporated into this exception's detail
- * message.
- *
- * @param message The detail message (which is saved for later retrieval by
- * the
- * {@link #getMessage()} method)
- * @param cause The cause (which is saved for later retrieval by the {@link
- * #getCause()} method). (A null value is permitted, and indicates that the
- * cause is nonexistent or unknown.)
- * @since 1.6
- */
- public KSMException(String message, Throwable cause,
- KSMException.ResultCodes result) {
- super(message, cause);
- this.result = result;
- }
-
- /**
- * Constructs an {@code IOException} with the specified cause and a
- * detail message of {@code (cause==null ? null : cause.toString())}
- * (which typically contains the class and detail message of {@code cause}).
- * This constructor is useful for IO exceptions that are little more
- * than wrappers for other throwables.
- *
- * @param cause The cause (which is saved for later retrieval by the {@link
- * #getCause()} method). (A null value is permitted, and indicates that the
- * cause is nonexistent or unknown.)
- * @since 1.6
- */
- public KSMException(Throwable cause, KSMException.ResultCodes result) {
- super(cause);
- this.result = result;
- }
-
- /**
- * Returns resultCode.
- * @return ResultCode
- */
- public KSMException.ResultCodes getResult() {
- return result;
- }
-
- /**
- * Error codes to make it easy to decode these exceptions.
- */
- public enum ResultCodes {
- FAILED_TOO_MANY_USER_VOLUMES,
- FAILED_VOLUME_ALREADY_EXISTS,
- FAILED_VOLUME_NOT_FOUND,
- FAILED_VOLUME_NOT_EMPTY,
- FAILED_USER_NOT_FOUND,
- FAILED_BUCKET_ALREADY_EXISTS,
- FAILED_BUCKET_NOT_FOUND,
- FAILED_BUCKET_NOT_EMPTY,
- FAILED_KEY_ALREADY_EXISTS,
- FAILED_KEY_NOT_FOUND,
- FAILED_KEY_ALLOCATION,
- FAILED_KEY_DELETION,
- FAILED_KEY_RENAME,
- FAILED_INVALID_KEY_NAME,
- FAILED_METADATA_ERROR,
- FAILED_INTERNAL_ERROR,
- KSM_NOT_INITIALIZED,
- SCM_VERSION_MISMATCH_ERROR
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/package-info.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/package-info.java
deleted file mode 100644
index 09fd87f..0000000
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.ozone.ksm.exceptions;
-// Exception thrown by KSM.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/package-info.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/package-info.java
deleted file mode 100644
index 09d9f32..0000000
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.ozone.ksm;
-/*
- This package contains the keyspace manager classes.
- */
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManager.java
new file mode 100644
index 0000000..ddb2b0e
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManager.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * BucketManager handles all the bucket level operations.
+ */
+public interface BucketManager {
+ /**
+ * Creates a bucket.
+ * @param bucketInfo - OmBucketInfo for creating bucket.
+ */
+ void createBucket(OmBucketInfo bucketInfo) throws IOException;
+ /**
+ * Returns Bucket Information.
+ * @param volumeName - Name of the Volume.
+ * @param bucketName - Name of the Bucket.
+ */
+ OmBucketInfo getBucketInfo(String volumeName, String bucketName)
+ throws IOException;
+
+ /**
+ * Sets bucket property from args.
+ * @param args - BucketArgs.
+ * @throws IOException
+ */
+ void setBucketProperty(OmBucketArgs args) throws IOException;
+
+ /**
+ * Deletes an existing empty bucket from volume.
+ * @param volumeName - Name of the volume.
+ * @param bucketName - Name of the bucket.
+ * @throws IOException
+ */
+ void deleteBucket(String volumeName, String bucketName) throws IOException;
+
+ /**
+ * Returns a list of buckets represented by {@link OmBucketInfo}
+ * in the given volume.
+ *
+ * @param volumeName
+ * Required parameter volume name determines buckets in which volume
+ * to return.
+ * @param startBucket
+ * Optional start bucket name parameter indicating where to start
+ * the bucket listing from, this key is excluded from the result.
+ * @param bucketPrefix
+ * Optional start key parameter, restricting the response to buckets
+ * that begin with the specified name.
+ * @param maxNumOfBuckets
+ * The maximum number of buckets to return. It ensures
+ * the size of the result will not exceed this limit.
+ * @return a list of buckets.
+ * @throws IOException
+ */
+ List<OmBucketInfo> listBuckets(String volumeName,
+ String startBucket, String bucketPrefix, int maxNumOfBuckets)
+ throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
new file mode 100644
index 0000000..4bbce81
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.protocol.proto
+ .OzoneManagerProtocolProtos.BucketInfo;
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.util.Time;
+import org.iq80.leveldb.DBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * OM bucket manager.
+ */
+public class BucketManagerImpl implements BucketManager {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(BucketManagerImpl.class);
+
+ /**
+ * OMMetadataManager is used for accessing OM MetadataDB and ReadWriteLock.
+ */
+ private final OMMetadataManager metadataManager;
+
+ /**
+ * Constructs BucketManager.
+ * @param metadataManager
+ */
+ public BucketManagerImpl(OMMetadataManager metadataManager){
+ this.metadataManager = metadataManager;
+ }
+
+ /**
+ * MetadataDB is maintained in MetadataManager and shared between
+ * BucketManager and VolumeManager. (and also by KeyManager)
+ *
+ * BucketManager uses MetadataDB to store bucket level information.
+ *
+ * Keys used in BucketManager for storing data into MetadataDB
+ * for BucketInfo:
+ * {volume/bucket} -> bucketInfo
+ *
+ * Work flow of create bucket:
+ *
+ * -> Check if the Volume exists in metadataDB, if not throw
+ * VolumeNotFoundException.
+ * -> Else check if the Bucket exists in metadataDB, if so throw
+ * BucketExistException
+ * -> Else update MetadataDB with VolumeInfo.
+ */
+
+ /**
+ * Creates a bucket.
+ * @param bucketInfo - OmBucketInfo.
+ */
+ @Override
+ public void createBucket(OmBucketInfo bucketInfo) throws IOException {
+ Preconditions.checkNotNull(bucketInfo);
+ metadataManager.writeLock().lock();
+ String volumeName = bucketInfo.getVolumeName();
+ String bucketName = bucketInfo.getBucketName();
+ try {
+ byte[] volumeKey = metadataManager.getVolumeKey(volumeName);
+ byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+
+ //Check if the volume exists
+ if (metadataManager.get(volumeKey) == null) {
+ LOG.debug("volume: {} not found ", volumeName);
+ throw new OMException("Volume doesn't exist",
+ OMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
+ }
+ //Check if bucket already exists
+ if (metadataManager.get(bucketKey) != null) {
+ LOG.debug("bucket: {} already exists ", bucketName);
+ throw new OMException("Bucket already exist",
+ OMException.ResultCodes.FAILED_BUCKET_ALREADY_EXISTS);
+ }
+
+ OmBucketInfo omBucketInfo = OmBucketInfo.newBuilder()
+ .setVolumeName(bucketInfo.getVolumeName())
+ .setBucketName(bucketInfo.getBucketName())
+ .setAcls(bucketInfo.getAcls())
+ .setStorageType(bucketInfo.getStorageType())
+ .setIsVersionEnabled(bucketInfo.getIsVersionEnabled())
+ .setCreationTime(Time.now())
+ .build();
+ metadataManager.put(bucketKey, omBucketInfo.getProtobuf().toByteArray());
+
+ LOG.debug("created bucket: {} in volume: {}", bucketName, volumeName);
+ } catch (IOException | DBException ex) {
+ if (!(ex instanceof OMException)) {
+ LOG.error("Bucket creation failed for bucket:{} in volume:{}",
+ bucketName, volumeName, ex);
+ }
+ throw ex;
+ } finally {
+ metadataManager.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Returns Bucket Information.
+ *
+ * @param volumeName - Name of the Volume.
+ * @param bucketName - Name of the Bucket.
+ */
+ @Override
+ public OmBucketInfo getBucketInfo(String volumeName, String bucketName)
+ throws IOException {
+ Preconditions.checkNotNull(volumeName);
+ Preconditions.checkNotNull(bucketName);
+ metadataManager.readLock().lock();
+ try {
+ byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+ byte[] value = metadataManager.get(bucketKey);
+ if (value == null) {
+ LOG.debug("bucket: {} not found in volume: {}.", bucketName,
+ volumeName);
+ throw new OMException("Bucket not found",
+ OMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
+ }
+ return OmBucketInfo.getFromProtobuf(BucketInfo.parseFrom(value));
+ } catch (IOException | DBException ex) {
+ if (!(ex instanceof OMException)) {
+ LOG.error("Exception while getting bucket info for bucket: {}",
+ bucketName, ex);
+ }
+ throw ex;
+ } finally {
+ metadataManager.readLock().unlock();
+ }
+ }
+
+ /**
+ * Sets bucket property from args.
+ * @param args - BucketArgs.
+ * @throws IOException
+ */
+ @Override
+ public void setBucketProperty(OmBucketArgs args) throws IOException {
+ Preconditions.checkNotNull(args);
+ metadataManager.writeLock().lock();
+ String volumeName = args.getVolumeName();
+ String bucketName = args.getBucketName();
+ try {
+ byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+ //Check if volume exists
+ if(metadataManager.get(metadataManager.getVolumeKey(volumeName)) ==
+ null) {
+ LOG.debug("volume: {} not found ", volumeName);
+ throw new OMException("Volume doesn't exist",
+ OMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
+ }
+ byte[] value = metadataManager.get(bucketKey);
+ //Check if bucket exist
+ if(value == null) {
+ LOG.debug("bucket: {} not found ", bucketName);
+ throw new OMException("Bucket doesn't exist",
+ OMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
+ }
+ OmBucketInfo oldBucketInfo = OmBucketInfo.getFromProtobuf(
+ BucketInfo.parseFrom(value));
+ OmBucketInfo.Builder bucketInfoBuilder = OmBucketInfo.newBuilder();
+ bucketInfoBuilder.setVolumeName(oldBucketInfo.getVolumeName())
+ .setBucketName(oldBucketInfo.getBucketName());
+
+ //Check ACLs to update
+ if(args.getAddAcls() != null || args.getRemoveAcls() != null) {
+ bucketInfoBuilder.setAcls(getUpdatedAclList(oldBucketInfo.getAcls(),
+ args.getRemoveAcls(), args.getAddAcls()));
+ LOG.debug("Updating ACLs for bucket: {} in volume: {}",
+ bucketName, volumeName);
+ } else {
+ bucketInfoBuilder.setAcls(oldBucketInfo.getAcls());
+ }
+
+ //Check StorageType to update
+ StorageType storageType = args.getStorageType();
+ if (storageType != null) {
+ bucketInfoBuilder.setStorageType(storageType);
+ LOG.debug("Updating bucket storage type for bucket: {} in volume: {}",
+ bucketName, volumeName);
+ } else {
+ bucketInfoBuilder.setStorageType(oldBucketInfo.getStorageType());
+ }
+
+ //Check Versioning to update
+ Boolean versioning = args.getIsVersionEnabled();
+ if (versioning != null) {
+ bucketInfoBuilder.setIsVersionEnabled(versioning);
+ LOG.debug("Updating bucket versioning for bucket: {} in volume: {}",
+ bucketName, volumeName);
+ } else {
+ bucketInfoBuilder
+ .setIsVersionEnabled(oldBucketInfo.getIsVersionEnabled());
+ }
+ bucketInfoBuilder.setCreationTime(oldBucketInfo.getCreationTime());
+
+ metadataManager.put(bucketKey,
+ bucketInfoBuilder.build().getProtobuf().toByteArray());
+ } catch (IOException | DBException ex) {
+ if (!(ex instanceof OMException)) {
+ LOG.error("Setting bucket property failed for bucket:{} in volume:{}",
+ bucketName, volumeName, ex);
+ }
+ throw ex;
+ } finally {
+ metadataManager.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Updates the existing ACL list with remove and add ACLs that are passed.
+ * Remove is done before Add.
+ *
+ * @param existingAcls - old ACL list.
+ * @param removeAcls - ACLs to be removed.
+ * @param addAcls - ACLs to be added.
+ * @return updated ACL list.
+ */
+ private List<OzoneAcl> getUpdatedAclList(List<OzoneAcl> existingAcls,
+ List<OzoneAcl> removeAcls, List<OzoneAcl> addAcls) {
+ if(removeAcls != null && !removeAcls.isEmpty()) {
+ existingAcls.removeAll(removeAcls);
+ }
+ if(addAcls != null && !addAcls.isEmpty()) {
+ addAcls.stream().filter(acl -> !existingAcls.contains(acl)).forEach(
+ existingAcls::add);
+ }
+ return existingAcls;
+ }
+
+ /**
+ * Deletes an existing empty bucket from volume.
+ * @param volumeName - Name of the volume.
+ * @param bucketName - Name of the bucket.
+ * @throws IOException
+ */
+ public void deleteBucket(String volumeName, String bucketName)
+ throws IOException {
+ Preconditions.checkNotNull(volumeName);
+ Preconditions.checkNotNull(bucketName);
+ metadataManager.writeLock().lock();
+ try {
+ byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+ //Check if volume exists
+ if (metadataManager.get(metadataManager.getVolumeKey(volumeName))
+ == null) {
+ LOG.debug("volume: {} not found ", volumeName);
+ throw new OMException("Volume doesn't exist",
+ OMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
+ }
+ //Check if bucket exist
+ if (metadataManager.get(bucketKey) == null) {
+ LOG.debug("bucket: {} not found ", bucketName);
+ throw new OMException("Bucket doesn't exist",
+ OMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
+ }
+ //Check if bucket is empty
+ if (!metadataManager.isBucketEmpty(volumeName, bucketName)) {
+ LOG.debug("bucket: {} is not empty ", bucketName);
+ throw new OMException("Bucket is not empty",
+ OMException.ResultCodes.FAILED_BUCKET_NOT_EMPTY);
+ }
+ metadataManager.delete(bucketKey);
+ } catch (IOException ex) {
+ if (!(ex instanceof OMException)) {
+ LOG.error("Delete bucket failed for bucket:{} in volume:{}", bucketName,
+ volumeName, ex);
+ }
+ throw ex;
+ } finally {
+ metadataManager.writeLock().unlock();
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public List<OmBucketInfo> listBuckets(String volumeName,
+ String startBucket, String bucketPrefix, int maxNumOfBuckets)
+ throws IOException {
+ Preconditions.checkNotNull(volumeName);
+ metadataManager.readLock().lock();
+ try {
+ return metadataManager.listBuckets(
+ volumeName, startBucket, bucketPrefix, maxNumOfBuckets);
+ } finally {
+ metadataManager.readLock().unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
new file mode 100644
index 0000000..ee23fe0
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.BackgroundService;
+import org.apache.hadoop.utils.BackgroundTask;
+import org.apache.hadoop.utils.BackgroundTaskQueue;
+import org.apache.hadoop.utils.BackgroundTaskResult;
+import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
+
+/**
+ * This is the background service to delete keys.
+ * Scan the metadata of om periodically to get
+ * the keys with prefix "#deleting" and ask scm to
+ * delete metadata accordingly, if scm returns
+ * success for keys, then clean up those keys.
+ */
+public class KeyDeletingService extends BackgroundService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(KeyDeletingService.class);
+
+ // The thread pool size for key deleting service.
+ private final static int KEY_DELETING_CORE_POOL_SIZE = 2;
+
+ private final ScmBlockLocationProtocol scmClient;
+ private final KeyManager manager;
+ private final int keyLimitPerTask;
+
+ public KeyDeletingService(ScmBlockLocationProtocol scmClient,
+ KeyManager manager, long serviceInterval,
+ long serviceTimeout, Configuration conf) {
+ super("KeyDeletingService", serviceInterval, TimeUnit.MILLISECONDS,
+ KEY_DELETING_CORE_POOL_SIZE, serviceTimeout);
+ this.scmClient = scmClient;
+ this.manager = manager;
+ this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK,
+ OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
+ }
+
+ @Override
+ public BackgroundTaskQueue getTasks() {
+ BackgroundTaskQueue queue = new BackgroundTaskQueue();
+ queue.add(new KeyDeletingTask());
+ return queue;
+ }
+
+ /**
+ * A key deleting task scans OM DB and looking for a certain number
+ * of pending-deletion keys, sends these keys along with their associated
+ * blocks to SCM for deletion. Once SCM confirms keys are deleted (once
+ * SCM persisted the blocks info in its deletedBlockLog), it removes
+ * these keys from the DB.
+ */
+ private class KeyDeletingTask implements
+ BackgroundTask<BackgroundTaskResult> {
+
+ @Override
+ public int getPriority() {
+ return 0;
+ }
+
+ @Override
+ public BackgroundTaskResult call() throws Exception {
+ try {
+ long startTime = Time.monotonicNow();
+ List<BlockGroup> keyBlocksList = manager
+ .getPendingDeletionKeys(keyLimitPerTask);
+ if (keyBlocksList.size() > 0) {
+ LOG.info("Found {} to-delete keys in OM", keyBlocksList.size());
+ List<DeleteBlockGroupResult> results =
+ scmClient.deleteKeyBlocks(keyBlocksList);
+ for (DeleteBlockGroupResult result : results) {
+ if (result.isSuccess()) {
+ try {
+ // Purge key from OM DB.
+ manager.deletePendingDeletionKey(result.getObjectKey());
+ LOG.debug("Key {} deleted from OM DB", result.getObjectKey());
+ } catch (IOException e) {
+ // if a pending deletion key is failed to delete,
+ // print a warning here and retain it in this state,
+ // so that it can be attempt to delete next time.
+ LOG.warn("Failed to delete pending-deletion key {}",
+ result.getObjectKey(), e);
+ }
+ } else {
+ // Key deletion failed, retry in next interval.
+ LOG.warn("Key {} deletion failed because some of the blocks"
+ + " were failed to delete, failed blocks: {}",
+ result.getObjectKey(),
+ StringUtils.join(",", result.getFailedBlocks()));
+ }
+ }
+
+ if (!results.isEmpty()) {
+ LOG.info("Number of key deleted from OM DB: {},"
+ + " task elapsed time: {}ms",
+ results.size(), Time.monotonicNow() - startTime);
+ }
+
+ return results::size;
+ } else {
+ LOG.debug("No pending deletion key found in OM");
+ }
+ } catch (IOException e) {
+ LOG.error("Unable to get pending deletion keys, retry in"
+ + " next interval", e);
+ }
+ return EmptyTaskResult.newResult();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
new file mode 100644
index 0000000..226c07d
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Handles key level commands.
+ */
+public interface KeyManager {
+
+ /**
+ * Start key manager.
+ */
+ void start();
+
+ /**
+ * Stop key manager.
+ */
+ void stop() throws IOException;
+
+ /**
+ * After calling commit, the key will be made visible. There can be multiple
+ * open key writes in parallel (identified by client id). The most recently
+ * committed one will be the one visible.
+ *
+ * @param args the key to commit.
+ * @param clientID the client that is committing.
+ * @throws IOException
+ */
+ void commitKey(OmKeyArgs args, int clientID) throws IOException;
+
+ /**
+ * A client calls this on an open key, to request to allocate a new block,
+ * and appended to the tail of current block list of the open client.
+ *
+ * @param args the key to append
+ * @param clientID the client requesting block.
+ * @return the reference to the new block.
+ * @throws IOException
+ */
+ OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID)
+ throws IOException;
+ /**
+ * Given the args of a key to put, write an open key entry to meta data.
+ *
+ * In case that the container creation or key write failed on
+ * DistributedStorageHandler, this key's metadata will still stay in OM.
+ * TODO garbage collect the open keys that never get closed
+ *
+ * @param args the args of the key provided by client.
+ * @return a OpenKeySession instance client uses to talk to container.
+ * @throws Exception
+ */
+ OpenKeySession openKey(OmKeyArgs args) throws IOException;
+
+ /**
+ * Look up an existing key. Return the info of the key to client side, which
+ * DistributedStorageHandler will use to access the data on datanode.
+ *
+ * @param args the args of the key provided by client.
+ * @return a OmKeyInfo instance client uses to talk to container.
+ * @throws IOException
+ */
+ OmKeyInfo lookupKey(OmKeyArgs args) throws IOException;
+
+ /**
+ * Renames an existing key within a bucket.
+ *
+ * @param args the args of the key provided by client.
+ * @param toKeyName New name to be used for the key
+ * @throws IOException if specified key doesn't exist or
+ * some other I/O errors while renaming the key.
+ */
+ void renameKey(OmKeyArgs args, String toKeyName) throws IOException;
+
+ /**
+ * Deletes an object by an object key. The key will be immediately removed
+ * from OM namespace and become invisible to clients. The object data
+ * will be removed in async manner that might retain for some time.
+ *
+ * @param args the args of the key provided by client.
+ * @throws IOException if specified key doesn't exist or
+ * some other I/O errors while deleting an object.
+ */
+ void deleteKey(OmKeyArgs args) throws IOException;
+
+ /**
+ * Returns a list of keys represented by {@link OmKeyInfo}
+ * in the given bucket.
+ *
+ * @param volumeName
+ * the name of the volume.
+ * @param bucketName
+ * the name of the bucket.
+ * @param startKey
+ * the start key name, only the keys whose name is
+ * after this value will be included in the result.
+ * This key is excluded from the result.
+ * @param keyPrefix
+ * key name prefix, only the keys whose name has
+ * this prefix will be included in the result.
+ * @param maxKeys
+ * the maximum number of keys to return. It ensures
+ * the size of the result will not exceed this limit.
+ * @return a list of keys.
+ * @throws IOException
+ */
+ List<OmKeyInfo> listKeys(String volumeName,
+ String bucketName, String startKey, String keyPrefix, int maxKeys)
+ throws IOException;
+
+ /**
+ * Returns a list of pending deletion key info that ups to the given count.
+ * Each entry is a {@link BlockGroup}, which contains the info about the
+ * key name and all its associated block IDs. A pending deletion key is
+ * stored with #deleting# prefix in OM DB.
+ *
+ * @param count max number of keys to return.
+ * @return a list of {@link BlockGroup} representing keys and blocks.
+ * @throws IOException
+ */
+ List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;
+
+ /**
+ * Deletes a pending deletion key by its name. This is often called when
+ * key can be safely deleted from this layer. Once called, all footprints
+ * of the key will be purged from OM DB.
+ *
+ * @param objectKeyName object key name with #deleting# prefix.
+ * @throws IOException if specified key doesn't exist or other I/O errors.
+ */
+ void deletePendingDeletionKey(String objectKeyName) throws IOException;
+
+ /**
+ * Returns a list of all still open key info. Which contains the info about
+ * the key name and all its associated block IDs. A pending open key has
+ * prefix #open# in OM DB.
+ *
+ * @return a list of {@link BlockGroup} representing keys and blocks.
+ * @throws IOException
+ */
+ List<BlockGroup> getExpiredOpenKeys() throws IOException;
+
+ /**
+ * Deletes a expired open key by its name. Called when a hanging key has been
+ * lingering for too long. Once called, the open key entries gets removed
+ * from OM mdata data.
+ *
+ * @param objectKeyName object key name with #open# prefix.
+ * @throws IOException if specified key doesn't exist or other I/O errors.
+ */
+ void deleteExpiredOpenKey(String objectKeyName) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
new file mode 100644
index 0000000..ba92a29
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -0,0 +1,566 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.protocol.proto
+ .OzoneManagerProtocolProtos.KeyInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.BackgroundService;
+import org.apache.hadoop.utils.BatchOperation;
+import org.iq80.leveldb.DBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone
+ .OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT;
+import static org.apache.hadoop.ozone
+ .OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
+import static org.apache.hadoop.ozone
+ .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone
+ .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone
+ .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
+import static org.apache.hadoop.ozone
+ .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.ozone
+ .OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE;
+import static org.apache.hadoop.ozone
+ .OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS_DEFAULT;
+import static org.apache.hadoop.ozone
+ .OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
+import static org.apache.hadoop.ozone
+ .OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB;
+import org.apache.hadoop.hdds.protocol
+ .proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol
+ .proto.HddsProtos.ReplicationFactor;
+
+
+/**
+ * Implementation of keyManager.
+ */
+public class KeyManagerImpl implements KeyManager {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(KeyManagerImpl.class);
+
+ /**
+ * A SCM block client, used to talk to SCM to allocate block during putKey.
+ */
+ private final ScmBlockLocationProtocol scmBlockClient;
+ private final OMMetadataManager metadataManager;
+ private final long scmBlockSize;
+ private final boolean useRatis;
+ private final BackgroundService keyDeletingService;
+ private final BackgroundService openKeyCleanupService;
+
+ private final long preallocateMax;
+ private final Random random;
+ private final String omId;
+
+ public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
+ OMMetadataManager metadataManager, OzoneConfiguration conf,
+ String omId) {
+ this.scmBlockClient = scmBlockClient;
+ this.metadataManager = metadataManager;
+ this.scmBlockSize = conf.getLong(OZONE_SCM_BLOCK_SIZE_IN_MB,
+ OZONE_SCM_BLOCK_SIZE_DEFAULT) * OzoneConsts.MB;
+ this.useRatis = conf.getBoolean(DFS_CONTAINER_RATIS_ENABLED_KEY,
+ DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
+ long blockDeleteInterval = conf.getTimeDuration(
+ OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
+ OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ long serviceTimeout = conf.getTimeDuration(
+ OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
+ OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ this.preallocateMax = conf.getLong(
+ OZONE_KEY_PREALLOCATION_MAXSIZE,
+ OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT);
+ keyDeletingService = new KeyDeletingService(
+ scmBlockClient, this, blockDeleteInterval, serviceTimeout, conf);
+ int openkeyCheckInterval = conf.getInt(
+ OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS,
+ OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS_DEFAULT);
+ openKeyCleanupService = new OpenKeyCleanupService(
+ scmBlockClient, this, openkeyCheckInterval, serviceTimeout);
+ random = new Random();
+ this.omId = omId;
+ }
+
+ @VisibleForTesting
+ public BackgroundService getOpenKeyCleanupService() {
+ return openKeyCleanupService;
+ }
+
+ @Override
+ public void start() {
+ keyDeletingService.start();
+ openKeyCleanupService.start();
+ }
+
+ @Override
+ public void stop() throws IOException {
+ keyDeletingService.shutdown();
+ openKeyCleanupService.shutdown();
+ }
+
+ private void validateBucket(String volumeName, String bucketName)
+ throws IOException {
+ byte[] volumeKey = metadataManager.getVolumeKey(volumeName);
+ byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+
+ //Check if the volume exists
+ if(metadataManager.get(volumeKey) == null) {
+ LOG.error("volume not found: {}", volumeName);
+ throw new OMException("Volume not found",
+ OMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
+ }
+ //Check if bucket already exists
+ if(metadataManager.get(bucketKey) == null) {
+ LOG.error("bucket not found: {}/{} ", volumeName, bucketName);
+ throw new OMException("Bucket not found",
+ OMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
+ }
+ }
+
+ @Override
+ public OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID)
+ throws IOException {
+ Preconditions.checkNotNull(args);
+ metadataManager.writeLock().lock();
+ String volumeName = args.getVolumeName();
+ String bucketName = args.getBucketName();
+ String keyName = args.getKeyName();
+
+ try {
+ validateBucket(volumeName, bucketName);
+ String objectKey = metadataManager.getKeyWithDBPrefix(
+ volumeName, bucketName, keyName);
+ byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, clientID);
+ byte[] keyData = metadataManager.get(openKey);
+ if (keyData == null) {
+ LOG.error("Allocate block for a key not in open status in meta store " +
+ objectKey + " with ID " + clientID);
+ throw new OMException("Open Key not found",
+ OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
+ }
+ OmKeyInfo keyInfo =
+ OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(keyData));
+ AllocatedBlock allocatedBlock =
+ scmBlockClient.allocateBlock(scmBlockSize, keyInfo.getType(),
+ keyInfo.getFactor(), omId);
+ OmKeyLocationInfo info = new OmKeyLocationInfo.Builder()
+ .setBlockID(allocatedBlock.getBlockID())
+ .setShouldCreateContainer(allocatedBlock.getCreateContainer())
+ .setLength(scmBlockSize)
+ .setOffset(0)
+ .build();
+ // current version not committed, so new blocks coming now are added to
+ // the same version
+ keyInfo.appendNewBlocks(Collections.singletonList(info));
+ keyInfo.updateModifcationTime();
+ metadataManager.put(openKey, keyInfo.getProtobuf().toByteArray());
+ return info;
+ } finally {
+ metadataManager.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public OpenKeySession openKey(OmKeyArgs args) throws IOException {
+ Preconditions.checkNotNull(args);
+ metadataManager.writeLock().lock();
+ String volumeName = args.getVolumeName();
+ String bucketName = args.getBucketName();
+ String keyName = args.getKeyName();
+ ReplicationFactor factor = args.getFactor();
+ ReplicationType type = args.getType();
+
+ // If user does not specify a replication strategy or
+ // replication factor, OM will use defaults.
+ if(factor == null) {
+ factor = useRatis ? ReplicationFactor.THREE: ReplicationFactor.ONE;
+ }
+
+ if(type == null) {
+ type = useRatis ? ReplicationType.RATIS : ReplicationType.STAND_ALONE;
+ }
+
+ try {
+ validateBucket(volumeName, bucketName);
+ long requestedSize = Math.min(preallocateMax, args.getDataSize());
+ List<OmKeyLocationInfo> locations = new ArrayList<>();
+ String objectKey = metadataManager.getKeyWithDBPrefix(
+ volumeName, bucketName, keyName);
+ // requested size is not required but more like a optimization:
+ // SCM looks at the requested, if it 0, no block will be allocated at
+ // the point, if client needs more blocks, client can always call
+ // allocateBlock. But if requested size is not 0, OM will preallocate
+ // some blocks and piggyback to client, to save RPC calls.
+ while (requestedSize > 0) {
+ long allocateSize = Math.min(scmBlockSize, requestedSize);
+ AllocatedBlock allocatedBlock =
+ scmBlockClient.allocateBlock(allocateSize, type, factor, omId);
+ OmKeyLocationInfo subKeyInfo = new OmKeyLocationInfo.Builder()
+ .setBlockID(allocatedBlock.getBlockID())
+ .setShouldCreateContainer(allocatedBlock.getCreateContainer())
+ .setLength(allocateSize)
+ .setOffset(0)
+ .build();
+ locations.add(subKeyInfo);
+ requestedSize -= allocateSize;
+ }
+ // NOTE size of a key is not a hard limit on anything, it is a value that
+ // client should expect, in terms of current size of key. If client sets a
+ // value, then this value is used, otherwise, we allocate a single block
+ // which is the current size, if read by the client.
+ long size = args.getDataSize() >= 0 ? args.getDataSize() : scmBlockSize;
+ byte[] keyKey = metadataManager.getDBKeyBytes(
+ volumeName, bucketName, keyName);
+ byte[] value = metadataManager.get(keyKey);
+ OmKeyInfo keyInfo;
+ long openVersion;
+ if (value != null) {
+ // the key already exist, the new blocks will be added as new version
+ keyInfo = OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(value));
+ // when locations.size = 0, the new version will have identical blocks
+ // as its previous version
+ openVersion = keyInfo.addNewVersion(locations);
+ keyInfo.setDataSize(size + keyInfo.getDataSize());
+ } else {
+ // the key does not exist, create a new object, the new blocks are the
+ // version 0
+ long currentTime = Time.now();
+ keyInfo = new OmKeyInfo.Builder()
+ .setVolumeName(args.getVolumeName())
+ .setBucketName(args.getBucketName())
+ .setKeyName(args.getKeyName())
+ .setOmKeyLocationInfos(Collections.singletonList(
+ new OmKeyLocationInfoGroup(0, locations)))
+ .setCreationTime(currentTime)
+ .setModificationTime(currentTime)
+ .setDataSize(size)
+ .setReplicationType(type)
+ .setReplicationFactor(factor)
+ .build();
+ openVersion = 0;
+ }
+ // Generate a random ID which is not already in meta db.
+ int id = -1;
+ // in general this should finish in a couple times at most. putting some
+ // arbitrary large number here to avoid dead loop.
+ for (int j = 0; j < 10000; j++) {
+ id = random.nextInt();
+ byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, id);
+ if (metadataManager.get(openKey) == null) {
+ metadataManager.put(openKey, keyInfo.getProtobuf().toByteArray());
+ break;
+ }
+ }
+ if (id == -1) {
+ throw new IOException("Failed to find a usable id for " + objectKey);
+ }
+ LOG.debug("Key {} allocated in volume {} bucket {}",
+ keyName, volumeName, bucketName);
+ return new OpenKeySession(id, keyInfo, openVersion);
+ } catch (OMException e) {
+ throw e;
+ } catch (IOException ex) {
+ if (!(ex instanceof OMException)) {
+ LOG.error("Key open failed for volume:{} bucket:{} key:{}",
+ volumeName, bucketName, keyName, ex);
+ }
+ throw new OMException(ex.getMessage(),
+ OMException.ResultCodes.FAILED_KEY_ALLOCATION);
+ } finally {
+ metadataManager.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void commitKey(OmKeyArgs args, int clientID) throws IOException {
+ Preconditions.checkNotNull(args);
+ metadataManager.writeLock().lock();
+ String volumeName = args.getVolumeName();
+ String bucketName = args.getBucketName();
+ String keyName = args.getKeyName();
+ try {
+ validateBucket(volumeName, bucketName);
+ String objectKey = metadataManager.getKeyWithDBPrefix(
+ volumeName, bucketName, keyName);
+ byte[] objectKeyBytes = metadataManager.getDBKeyBytes(volumeName,
+ bucketName, keyName);
+ byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, clientID);
+ byte[] openKeyData = metadataManager.get(openKey);
+ if (openKeyData == null) {
+ throw new OMException("Commit a key without corresponding entry " +
+ DFSUtil.bytes2String(openKey), ResultCodes.FAILED_KEY_NOT_FOUND);
+ }
+ OmKeyInfo keyInfo =
+ OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(openKeyData));
+ keyInfo.setDataSize(args.getDataSize());
+ keyInfo.setModificationTime(Time.now());
+ BatchOperation batch = new BatchOperation();
+ batch.delete(openKey);
+ batch.put(objectKeyBytes, keyInfo.getProtobuf().toByteArray());
+ metadataManager.writeBatch(batch);
+ } catch (OMException e) {
+ throw e;
+ } catch (IOException ex) {
+ LOG.error("Key commit failed for volume:{} bucket:{} key:{}",
+ volumeName, bucketName, keyName, ex);
+ throw new OMException(ex.getMessage(),
+ OMException.ResultCodes.FAILED_KEY_ALLOCATION);
+ } finally {
+ metadataManager.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
+ Preconditions.checkNotNull(args);
+ metadataManager.writeLock().lock();
+ String volumeName = args.getVolumeName();
+ String bucketName = args.getBucketName();
+ String keyName = args.getKeyName();
+ try {
+ byte[] keyKey = metadataManager.getDBKeyBytes(
+ volumeName, bucketName, keyName);
+ byte[] value = metadataManager.get(keyKey);
+ if (value == null) {
+ LOG.debug("volume:{} bucket:{} Key:{} not found",
+ volumeName, bucketName, keyName);
+ throw new OMException("Key not found",
+ OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
+ }
+ return OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(value));
+ } catch (DBException ex) {
+ LOG.error("Get key failed for volume:{} bucket:{} key:{}",
+ volumeName, bucketName, keyName, ex);
+ throw new OMException(ex.getMessage(),
+ OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
+ } finally {
+ metadataManager.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void renameKey(OmKeyArgs args, String toKeyName) throws IOException {
+ Preconditions.checkNotNull(args);
+ Preconditions.checkNotNull(toKeyName);
+ String volumeName = args.getVolumeName();
+ String bucketName = args.getBucketName();
+ String fromKeyName = args.getKeyName();
+ if (toKeyName.length() == 0 || fromKeyName.length() == 0) {
+ LOG.error("Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}.",
+ volumeName, bucketName, fromKeyName, toKeyName);
+ throw new OMException("Key name is empty",
+ ResultCodes.FAILED_INVALID_KEY_NAME);
+ }
+
+ metadataManager.writeLock().lock();
+ try {
+ // fromKeyName should exist
+ byte[] fromKey = metadataManager.getDBKeyBytes(
+ volumeName, bucketName, fromKeyName);
+ byte[] fromKeyValue = metadataManager.get(fromKey);
+ if (fromKeyValue == null) {
+ // TODO: Add support for renaming open key
+ LOG.error(
+ "Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}. "
+ + "Key: {} not found.", volumeName, bucketName, fromKeyName,
+ toKeyName, fromKeyName);
+ throw new OMException("Key not found",
+ OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
+ }
+
+ // toKeyName should not exist
+ byte[] toKey =
+ metadataManager.getDBKeyBytes(volumeName, bucketName, toKeyName);
+ byte[] toKeyValue = metadataManager.get(toKey);
+ if (toKeyValue != null) {
+ LOG.error(
+ "Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}. "
+ + "Key: {} already exists.", volumeName, bucketName,
+ fromKeyName, toKeyName, toKeyName);
+ throw new OMException("Key not found",
+ OMException.ResultCodes.FAILED_KEY_ALREADY_EXISTS);
+ }
+
+ if (fromKeyName.equals(toKeyName)) {
+ return;
+ }
+
+ OmKeyInfo newKeyInfo =
+ OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(fromKeyValue));
+ newKeyInfo.setKeyName(toKeyName);
+ newKeyInfo.updateModifcationTime();
+ BatchOperation batch = new BatchOperation();
+ batch.delete(fromKey);
+ batch.put(toKey, newKeyInfo.getProtobuf().toByteArray());
+ metadataManager.writeBatch(batch);
+ } catch (DBException ex) {
+ LOG.error("Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}.",
+ volumeName, bucketName, fromKeyName, toKeyName, ex);
+ throw new OMException(ex.getMessage(),
+ ResultCodes.FAILED_KEY_RENAME);
+ } finally {
+ metadataManager.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void deleteKey(OmKeyArgs args) throws IOException {
+ Preconditions.checkNotNull(args);
+ metadataManager.writeLock().lock();
+ String volumeName = args.getVolumeName();
+ String bucketName = args.getBucketName();
+ String keyName = args.getKeyName();
+ try {
+ byte[] objectKey = metadataManager.getDBKeyBytes(
+ volumeName, bucketName, keyName);
+ byte[] objectValue = metadataManager.get(objectKey);
+ if (objectValue == null) {
+ throw new OMException("Key not found",
+ OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
+ }
+ byte[] deletingKey = metadataManager.getDeletedKeyName(objectKey);
+ BatchOperation batch = new BatchOperation();
+ batch.put(deletingKey, objectValue);
+ batch.delete(objectKey);
+ metadataManager.writeBatch(batch);
+ } catch (DBException ex) {
+ LOG.error(String.format("Delete key failed for volume:%s "
+ + "bucket:%s key:%s", volumeName, bucketName, keyName), ex);
+ throw new OMException(ex.getMessage(), ex,
+ ResultCodes.FAILED_KEY_DELETION);
+ } finally {
+ metadataManager.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
+ String startKey, String keyPrefix, int maxKeys) throws IOException {
+ Preconditions.checkNotNull(volumeName);
+ Preconditions.checkNotNull(bucketName);
+
+ metadataManager.readLock().lock();
+ try {
+ return metadataManager.listKeys(volumeName, bucketName,
+ startKey, keyPrefix, maxKeys);
+ } finally {
+ metadataManager.readLock().unlock();
+ }
+ }
+
+ @Override
+ public List<BlockGroup> getPendingDeletionKeys(final int count)
+ throws IOException {
+ metadataManager.readLock().lock();
+ try {
+ return metadataManager.getPendingDeletionKeys(count);
+ } finally {
+ metadataManager.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void deletePendingDeletionKey(String objectKeyName)
+ throws IOException{
+ Preconditions.checkNotNull(objectKeyName);
+ if (!objectKeyName.startsWith(OzoneConsts.DELETING_KEY_PREFIX)) {
+ throw new IllegalArgumentException("Invalid key name,"
+ + " the name should be the key name with deleting prefix");
+ }
+
+ // Simply removes the entry from OM DB.
+ metadataManager.writeLock().lock();
+ try {
+ byte[] pendingDelKey = DFSUtil.string2Bytes(objectKeyName);
+ byte[] delKeyValue = metadataManager.get(pendingDelKey);
+ if (delKeyValue == null) {
+ throw new IOException("Failed to delete key " + objectKeyName
+ + " because it is not found in DB");
+ }
+ metadataManager.delete(pendingDelKey);
+ } finally {
+ metadataManager.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public List<BlockGroup> getExpiredOpenKeys() throws IOException {
+ metadataManager.readLock().lock();
+ try {
+ return metadataManager.getExpiredOpenKeys();
+ } finally {
+ metadataManager.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void deleteExpiredOpenKey(String objectKeyName) throws IOException {
+ Preconditions.checkNotNull(objectKeyName);
+ if (!objectKeyName.startsWith(OzoneConsts.OPEN_KEY_PREFIX)) {
+ throw new IllegalArgumentException("Invalid key name,"
+ + " the name should be the key name with open key prefix");
+ }
+
+ // Simply removes the entry from OM DB.
+ metadataManager.writeLock().lock();
+ try {
+ byte[] openKey = DFSUtil.string2Bytes(objectKeyName);
+ byte[] delKeyValue = metadataManager.get(openKey);
+ if (delKeyValue == null) {
+ throw new IOException("Failed to delete key " + objectKeyName
+ + " because it is not found in DB");
+ }
+ metadataManager.delete(openKey);
+ } finally {
+ metadataManager.writeLock().unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMXBean.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMXBean.java
new file mode 100644
index 0000000..3ab9f47
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMXBean.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.server.ServiceRuntimeInfo;
+
+/**
+ * This is the JMX management interface for OM information.
+ */
+@InterfaceAudience.Private
+public interface OMMXBean extends ServiceRuntimeInfo {
+
+ String getRpcPort();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
new file mode 100644
index 0000000..f2e78e6
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataStore;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * OM metadata manager interface.
+ */
+public interface OMMetadataManager {
+ /**
+ * Start metadata manager.
+ */
+ void start();
+
+ /**
+ * Stop metadata manager.
+ */
+ void stop() throws IOException;
+
+ /**
+ * Get metadata store.
+ * @return metadata store.
+ */
+ @VisibleForTesting
+ MetadataStore getStore();
+
+ /**
+ * Returns the read lock used on Metadata DB.
+ * @return readLock
+ */
+ Lock readLock();
+
+ /**
+ * Returns the write lock used on Metadata DB.
+ * @return writeLock
+ */
+ Lock writeLock();
+
+ /**
+ * Returns the value associated with this key.
+ * @param key - key
+ * @return value
+ */
+ byte[] get(byte[] key) throws IOException;
+
+ /**
+ * Puts a Key into Metadata DB.
+ * @param key - key
+ * @param value - value
+ */
+ void put(byte[] key, byte[] value) throws IOException;
+
+ /**
+ * Deletes a Key from Metadata DB.
+ * @param key - key
+ */
+ void delete(byte[] key) throws IOException;
+
+ /**
+ * Atomic write a batch of operations.
+ * @param batch
+ * @throws IOException
+ */
+ void writeBatch(BatchOperation batch) throws IOException;
+
+ /**
+ * Given a volume return the corresponding DB key.
+ * @param volume - Volume name
+ */
+ byte[] getVolumeKey(String volume);
+
+ /**
+ * Given a user return the corresponding DB key.
+ * @param user - User name
+ */
+ byte[] getUserKey(String user);
+
+ /**
+ * Given a volume and bucket, return the corresponding DB key.
+ * @param volume - User name
+ * @param bucket - Bucket name
+ */
+ byte[] getBucketKey(String volume, String bucket);
+
+ /**
+ * Given a volume, bucket and a key, return the corresponding DB key.
+ * @param volume - volume name
+ * @param bucket - bucket name
+ * @param key - key name
+ * @return bytes of DB key.
+ */
+ byte[] getDBKeyBytes(String volume, String bucket, String key);
+
+ /**
+ * Returns the DB key name of a deleted key in OM metadata store.
+ * The name for a deleted key has prefix #deleting# followed by
+ * the actual key name.
+ * @param keyName - key name
+ * @return bytes of DB key.
+ */
+ byte[] getDeletedKeyName(byte[] keyName);
+
+ /**
+ * Returns the DB key name of a open key in OM metadata store.
+ * Should be #open# prefix followed by actual key name.
+ * @param keyName - key name
+ * @param id - the id for this open
+ * @return bytes of DB key.
+ */
+ byte[] getOpenKeyNameBytes(String keyName, int id);
+
+ /**
+ * Returns the full name of a key given volume name, bucket name and key name.
+ * Generally done by padding certain delimiters.
+ *
+ * @param volumeName - volume name
+ * @param bucketName - bucket name
+ * @param keyName - key name
+ * @return the full key name.
+ */
+ String getKeyWithDBPrefix(String volumeName, String bucketName,
+ String keyName);
+
+ /**
+ * Given a volume, check if it is empty,
+ * i.e there are no buckets inside it.
+ * @param volume - Volume name
+ */
+ boolean isVolumeEmpty(String volume) throws IOException;
+
+ /**
+ * Given a volume/bucket, check if it is empty,
+ * i.e there are no keys inside it.
+ * @param volume - Volume name
+ * @param bucket - Bucket name
+ * @return true if the bucket is empty
+ */
+ boolean isBucketEmpty(String volume, String bucket) throws IOException;
+
+ /**
+ * Returns a list of buckets represented by {@link OmBucketInfo}
+ * in the given volume.
+ *
+ * @param volumeName
+ * the name of the volume. This argument is required,
+ * this method returns buckets in this given volume.
+ * @param startBucket
+ * the start bucket name. Only the buckets whose name is
+ * after this value will be included in the result.
+ * This key is excluded from the result.
+ * @param bucketPrefix
+ * bucket name prefix. Only the buckets whose name has
+ * this prefix will be included in the result.
+ * @param maxNumOfBuckets
+ * the maximum number of buckets to return. It ensures
+ * the size of the result will not exceed this limit.
+ * @return a list of buckets.
+ * @throws IOException
+ */
+ List<OmBucketInfo> listBuckets(String volumeName, String startBucket,
+ String bucketPrefix, int maxNumOfBuckets) throws IOException;
+
+ /**
+ * Returns a list of keys represented by {@link OmKeyInfo}
+ * in the given bucket.
+ *
+ * @param volumeName
+ * the name of the volume.
+ * @param bucketName
+ * the name of the bucket.
+ * @param startKey
+ * the start key name, only the keys whose name is
+ * after this value will be included in the result.
+ * This key is excluded from the result.
+ * @param keyPrefix
+ * key name prefix, only the keys whose name has
+ * this prefix will be included in the result.
+ * @param maxKeys
+ * the maximum number of keys to return. It ensures
+ * the size of the result will not exceed this limit.
+ * @return a list of keys.
+ * @throws IOException
+ */
+ List<OmKeyInfo> listKeys(String volumeName,
+ String bucketName, String startKey, String keyPrefix, int maxKeys)
+ throws IOException;
+
+ /**
+ * Returns a list of volumes owned by a given user; if user is null,
+ * returns all volumes.
+ *
+ * @param userName
+ * volume owner
+ * @param prefix
+ * the volume prefix used to filter the listing result.
+ * @param startKey
+ * the start volume name determines where to start listing from,
+ * this key is excluded from the result.
+ * @param maxKeys
+ * the maximum number of volumes to return.
+ * @return a list of {@link OmVolumeArgs}
+ * @throws IOException
+ */
+ List<OmVolumeArgs> listVolumes(String userName, String prefix,
+ String startKey, int maxKeys) throws IOException;
+
+ /**
+ * Returns a list of pending deletion key info that ups to the given count.
+ * Each entry is a {@link BlockGroup}, which contains the info about the
+ * key name and all its associated block IDs. A pending deletion key is
+ * stored with #deleting# prefix in OM DB.
+ *
+ * @param count max number of keys to return.
+ * @return a list of {@link BlockGroup} represent keys and blocks.
+ * @throws IOException
+ */
+ List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;
+
+ /**
+ * Returns a list of all still open key info. Which contains the info about
+ * the key name and all its associated block IDs. A pending open key has
+ * prefix #open# in OM DB.
+ *
+ * @return a list of {@link BlockGroup} representing keys and blocks.
+ * @throws IOException
+ */
+ List<BlockGroup> getExpiredOpenKeys() throws IOException;
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org