You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2018/07/09 18:26:23 UTC

[31/50] [abbrv] hadoop git commit: HDDS-167. Rename KeySpaceManager to OzoneManager. Contributed by Arpit Agarwal.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
deleted file mode 100644
index cc2f78a..0000000
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/VolumeManagerImpl.java
+++ /dev/null
@@ -1,391 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.ozone.ksm;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.OzoneAclInfo;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.VolumeList;
-import org.apache.hadoop.ozone.protocol.proto
-    .KeySpaceManagerProtocolProtos.VolumeInfo;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.utils.BatchOperation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
-    .OZONE_KSM_USER_MAX_VOLUME_DEFAULT;
-import static org.apache.hadoop.ozone.ksm.KSMConfigKeys
-    .OZONE_KSM_USER_MAX_VOLUME;
-import static org.apache.hadoop.ozone.ksm.exceptions
-    .KSMException.ResultCodes;
-
-/**
- * KSM volume management code.
- */
-public class VolumeManagerImpl implements VolumeManager {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(VolumeManagerImpl.class);
-
-  private final KSMMetadataManager metadataManager;
-  private final int maxUserVolumeCount;
-
-  /**
-   * Constructor.
-   * @param conf - Ozone configuration.
-   * @throws IOException
-   */
-  public VolumeManagerImpl(KSMMetadataManager metadataManager,
-      OzoneConfiguration conf) throws IOException {
-    this.metadataManager = metadataManager;
-    this.maxUserVolumeCount = conf.getInt(OZONE_KSM_USER_MAX_VOLUME,
-        OZONE_KSM_USER_MAX_VOLUME_DEFAULT);
-  }
-
-  // Helpers to add and delete volume from user list
-  private void addVolumeToOwnerList(String volume, String owner,
-      BatchOperation batchOperation) throws IOException {
-    // Get the volume list
-    byte[] dbUserKey = metadataManager.getUserKey(owner);
-    byte[] volumeList  = metadataManager.get(dbUserKey);
-    List<String> prevVolList = new LinkedList<>();
-    if (volumeList != null) {
-      VolumeList vlist = VolumeList.parseFrom(volumeList);
-      prevVolList.addAll(vlist.getVolumeNamesList());
-    }
-
-    // Check the volume count
-    if (prevVolList.size() >= maxUserVolumeCount) {
-      LOG.debug("Too many volumes for user:{}", owner);
-      throw new KSMException(ResultCodes.FAILED_TOO_MANY_USER_VOLUMES);
-    }
-
-    // Add the new volume to the list
-    prevVolList.add(volume);
-    VolumeList newVolList = VolumeList.newBuilder()
-        .addAllVolumeNames(prevVolList).build();
-    batchOperation.put(dbUserKey, newVolList.toByteArray());
-  }
-
-  private void delVolumeFromOwnerList(String volume, String owner,
-                                      BatchOperation batchOperation)
-      throws IOException {
-    // Get the volume list
-    byte[] dbUserKey = metadataManager.getUserKey(owner);
-    byte[] volumeList  = metadataManager.get(dbUserKey);
-    List<String> prevVolList = new LinkedList<>();
-    if (volumeList != null) {
-      VolumeList vlist = VolumeList.parseFrom(volumeList);
-      prevVolList.addAll(vlist.getVolumeNamesList());
-    } else {
-      LOG.debug("volume:{} not found for user:{}");
-      throw new KSMException(ResultCodes.FAILED_USER_NOT_FOUND);
-    }
-
-    // Remove the volume from the list
-    prevVolList.remove(volume);
-    if (prevVolList.size() == 0) {
-      batchOperation.delete(dbUserKey);
-    } else {
-      VolumeList newVolList = VolumeList.newBuilder()
-          .addAllVolumeNames(prevVolList).build();
-      batchOperation.put(dbUserKey, newVolList.toByteArray());
-    }
-  }
-
-  /**
-   * Creates a volume.
-   * @param args - KsmVolumeArgs.
-   */
-  @Override
-  public void createVolume(KsmVolumeArgs args) throws IOException {
-    Preconditions.checkNotNull(args);
-    metadataManager.writeLock().lock();
-    try {
-      byte[] dbVolumeKey = metadataManager.getVolumeKey(args.getVolume());
-      byte[] volumeInfo = metadataManager.get(dbVolumeKey);
-
-      // Check of the volume already exists
-      if (volumeInfo != null) {
-        LOG.debug("volume:{} already exists", args.getVolume());
-        throw new KSMException(ResultCodes.FAILED_VOLUME_ALREADY_EXISTS);
-      }
-
-      BatchOperation batch = new BatchOperation();
-      // Write the vol info
-      List<HddsProtos.KeyValue> metadataList = new LinkedList<>();
-      for (Map.Entry<String, String> entry : args.getKeyValueMap().entrySet()) {
-        metadataList.add(HddsProtos.KeyValue.newBuilder()
-            .setKey(entry.getKey()).setValue(entry.getValue()).build());
-      }
-      List<OzoneAclInfo> aclList = args.getAclMap().ozoneAclGetProtobuf();
-
-      VolumeInfo newVolumeInfo = VolumeInfo.newBuilder()
-          .setAdminName(args.getAdminName())
-          .setOwnerName(args.getOwnerName())
-          .setVolume(args.getVolume())
-          .setQuotaInBytes(args.getQuotaInBytes())
-          .addAllMetadata(metadataList)
-          .addAllVolumeAcls(aclList)
-          .setCreationTime(Time.now())
-          .build();
-      batch.put(dbVolumeKey, newVolumeInfo.toByteArray());
-
-      // Add volume to user list
-      addVolumeToOwnerList(args.getVolume(), args.getOwnerName(), batch);
-      metadataManager.writeBatch(batch);
-      LOG.debug("created volume:{} user:{}", args.getVolume(),
-          args.getOwnerName());
-    } catch (IOException ex) {
-      if (!(ex instanceof KSMException)) {
-        LOG.error("Volume creation failed for user:{} volume:{}",
-            args.getOwnerName(), args.getVolume(), ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Changes the owner of a volume.
-   *
-   * @param volume - Name of the volume.
-   * @param owner - Name of the owner.
-   * @throws IOException
-   */
-  @Override
-  public void setOwner(String volume, String owner) throws IOException {
-    Preconditions.checkNotNull(volume);
-    Preconditions.checkNotNull(owner);
-    metadataManager.writeLock().lock();
-    try {
-      byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
-      byte[] volInfo = metadataManager.get(dbVolumeKey);
-      if (volInfo == null) {
-        LOG.debug("Changing volume ownership failed for user:{} volume:{}",
-            owner, volume);
-        throw  new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
-      }
-
-      VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
-      KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
-      Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
-
-      BatchOperation batch = new BatchOperation();
-      delVolumeFromOwnerList(volume, volumeArgs.getOwnerName(), batch);
-      addVolumeToOwnerList(volume, owner, batch);
-
-      KsmVolumeArgs newVolumeArgs =
-          KsmVolumeArgs.newBuilder().setVolume(volumeArgs.getVolume())
-              .setAdminName(volumeArgs.getAdminName())
-              .setOwnerName(owner)
-              .setQuotaInBytes(volumeArgs.getQuotaInBytes())
-              .setCreationTime(volumeArgs.getCreationTime())
-              .build();
-
-      VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf();
-      batch.put(dbVolumeKey, newVolumeInfo.toByteArray());
-
-      metadataManager.writeBatch(batch);
-    } catch (IOException ex) {
-      if (!(ex instanceof KSMException)) {
-        LOG.error("Changing volume ownership failed for user:{} volume:{}",
-            owner, volume, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Changes the Quota on a volume.
-   *
-   * @param volume - Name of the volume.
-   * @param quota - Quota in bytes.
-   * @throws IOException
-   */
-  public void setQuota(String volume, long quota) throws IOException {
-    Preconditions.checkNotNull(volume);
-    metadataManager.writeLock().lock();
-    try {
-      byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
-      byte[] volInfo = metadataManager.get(dbVolumeKey);
-      if (volInfo == null) {
-        LOG.debug("volume:{} does not exist", volume);
-        throw new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
-      }
-
-      VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
-      KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
-      Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
-
-      KsmVolumeArgs newVolumeArgs =
-          KsmVolumeArgs.newBuilder()
-              .setVolume(volumeArgs.getVolume())
-              .setAdminName(volumeArgs.getAdminName())
-              .setOwnerName(volumeArgs.getOwnerName())
-              .setQuotaInBytes(quota)
-              .setCreationTime(volumeArgs.getCreationTime()).build();
-
-      VolumeInfo newVolumeInfo = newVolumeArgs.getProtobuf();
-      metadataManager.put(dbVolumeKey, newVolumeInfo.toByteArray());
-    } catch (IOException ex) {
-      if (!(ex instanceof KSMException)) {
-        LOG.error("Changing volume quota failed for volume:{} quota:{}", volume,
-            quota, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Gets the volume information.
-   * @param volume - Volume name.
-   * @return VolumeArgs or exception is thrown.
-   * @throws IOException
-   */
-  public KsmVolumeArgs getVolumeInfo(String volume) throws IOException {
-    Preconditions.checkNotNull(volume);
-    metadataManager.readLock().lock();
-    try {
-      byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
-      byte[] volInfo = metadataManager.get(dbVolumeKey);
-      if (volInfo == null) {
-        LOG.debug("volume:{} does not exist", volume);
-        throw new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
-      }
-
-      VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
-      KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
-      Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
-      return volumeArgs;
-    } catch (IOException ex) {
-      if (!(ex instanceof KSMException)) {
-        LOG.warn("Info volume failed for volume:{}", volume, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.readLock().unlock();
-    }
-  }
-
-  /**
-   * Deletes an existing empty volume.
-   *
-   * @param volume - Name of the volume.
-   * @throws IOException
-   */
-  @Override
-  public void deleteVolume(String volume) throws IOException {
-    Preconditions.checkNotNull(volume);
-    metadataManager.writeLock().lock();
-    try {
-      BatchOperation batch = new BatchOperation();
-      byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
-      byte[] volInfo = metadataManager.get(dbVolumeKey);
-      if (volInfo == null) {
-        LOG.debug("volume:{} does not exist", volume);
-        throw new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
-      }
-
-      if (!metadataManager.isVolumeEmpty(volume)) {
-        LOG.debug("volume:{} is not empty", volume);
-        throw new KSMException(ResultCodes.FAILED_VOLUME_NOT_EMPTY);
-      }
-
-      VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
-      Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
-      // delete the volume from the owner list
-      // as well as delete the volume entry
-      delVolumeFromOwnerList(volume, volumeInfo.getOwnerName(), batch);
-      batch.delete(dbVolumeKey);
-      metadataManager.writeBatch(batch);
-    } catch (IOException ex) {
-      if (!(ex instanceof KSMException)) {
-        LOG.error("Delete volume failed for volume:{}", volume, ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Checks if the specified user with a role can access this volume.
-   *
-   * @param volume - volume
-   * @param userAcl - user acl which needs to be checked for access
-   * @return true if the user has access for the volume, false otherwise
-   * @throws IOException
-   */
-  public boolean checkVolumeAccess(String volume, OzoneAclInfo userAcl)
-      throws IOException {
-    Preconditions.checkNotNull(volume);
-    Preconditions.checkNotNull(userAcl);
-    metadataManager.readLock().lock();
-    try {
-      byte[] dbVolumeKey = metadataManager.getVolumeKey(volume);
-      byte[] volInfo = metadataManager.get(dbVolumeKey);
-      if (volInfo == null) {
-        LOG.debug("volume:{} does not exist", volume);
-        throw  new KSMException(ResultCodes.FAILED_VOLUME_NOT_FOUND);
-      }
-
-      VolumeInfo volumeInfo = VolumeInfo.parseFrom(volInfo);
-      KsmVolumeArgs volumeArgs = KsmVolumeArgs.getFromProtobuf(volumeInfo);
-      Preconditions.checkState(volume.equals(volumeInfo.getVolume()));
-      return volumeArgs.getAclMap().hasAccess(userAcl);
-    } catch (IOException ex) {
-      if (!(ex instanceof KSMException)) {
-        LOG.error("Check volume access failed for volume:{} user:{} rights:{}",
-            volume, userAcl.getName(), userAcl.getRights(), ex);
-      }
-      throw ex;
-    } finally {
-      metadataManager.readLock().unlock();
-    }
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  @Override
-  public List<KsmVolumeArgs> listVolumes(String userName,
-      String prefix, String startKey, int maxKeys) throws IOException {
-    metadataManager.readLock().lock();
-    try {
-      return metadataManager.listVolumes(
-          userName, prefix, startKey, maxKeys);
-    } finally {
-      metadataManager.readLock().unlock();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
deleted file mode 100644
index b902eab..0000000
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/KSMException.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.ksm.exceptions;
-
-import java.io.IOException;
-
-/**
- * Exception thrown by KSM.
- */
-public class KSMException extends IOException {
-  private final KSMException.ResultCodes result;
-
-  /**
-   * Constructs an {@code IOException} with {@code null}
-   * as its error detail message.
-   */
-  public KSMException(KSMException.ResultCodes result) {
-    this.result = result;
-  }
-
-  /**
-   * Constructs an {@code IOException} with the specified detail message.
-   *
-   * @param message The detail message (which is saved for later retrieval by
-   * the
-   * {@link #getMessage()} method)
-   */
-  public KSMException(String message, KSMException.ResultCodes result) {
-    super(message);
-    this.result = result;
-  }
-
-  /**
-   * Constructs an {@code IOException} with the specified detail message
-   * and cause.
-   * <p>
-   * <p> Note that the detail message associated with {@code cause} is
-   * <i>not</i> automatically incorporated into this exception's detail
-   * message.
-   *
-   * @param message The detail message (which is saved for later retrieval by
-   * the
-   * {@link #getMessage()} method)
-   * @param cause The cause (which is saved for later retrieval by the {@link
-   * #getCause()} method).  (A null value is permitted, and indicates that the
-   * cause is nonexistent or unknown.)
-   * @since 1.6
-   */
-  public KSMException(String message, Throwable cause,
-                      KSMException.ResultCodes result) {
-    super(message, cause);
-    this.result = result;
-  }
-
-  /**
-   * Constructs an {@code IOException} with the specified cause and a
-   * detail message of {@code (cause==null ? null : cause.toString())}
-   * (which typically contains the class and detail message of {@code cause}).
-   * This constructor is useful for IO exceptions that are little more
-   * than wrappers for other throwables.
-   *
-   * @param cause The cause (which is saved for later retrieval by the {@link
-   * #getCause()} method).  (A null value is permitted, and indicates that the
-   * cause is nonexistent or unknown.)
-   * @since 1.6
-   */
-  public KSMException(Throwable cause, KSMException.ResultCodes result) {
-    super(cause);
-    this.result = result;
-  }
-
-  /**
-   * Returns resultCode.
-   * @return ResultCode
-   */
-  public KSMException.ResultCodes getResult() {
-    return result;
-  }
-
-  /**
-   * Error codes to make it easy to decode these exceptions.
-   */
-  public enum ResultCodes {
-    FAILED_TOO_MANY_USER_VOLUMES,
-    FAILED_VOLUME_ALREADY_EXISTS,
-    FAILED_VOLUME_NOT_FOUND,
-    FAILED_VOLUME_NOT_EMPTY,
-    FAILED_USER_NOT_FOUND,
-    FAILED_BUCKET_ALREADY_EXISTS,
-    FAILED_BUCKET_NOT_FOUND,
-    FAILED_BUCKET_NOT_EMPTY,
-    FAILED_KEY_ALREADY_EXISTS,
-    FAILED_KEY_NOT_FOUND,
-    FAILED_KEY_ALLOCATION,
-    FAILED_KEY_DELETION,
-    FAILED_KEY_RENAME,
-    FAILED_INVALID_KEY_NAME,
-    FAILED_METADATA_ERROR,
-    FAILED_INTERNAL_ERROR,
-    KSM_NOT_INITIALIZED,
-    SCM_VERSION_MISMATCH_ERROR
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/package-info.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/package-info.java
deleted file mode 100644
index 09fd87f..0000000
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/exceptions/package-info.java
+++ /dev/null
@@ -1,19 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.ozone.ksm.exceptions;
-// Exception thrown by KSM.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/package-info.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/package-info.java
deleted file mode 100644
index 09d9f32..0000000
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/ksm/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.ozone.ksm;
-/*
- This package contains the keyspace manager classes.
- */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManager.java
new file mode 100644
index 0000000..ddb2b0e
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManager.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * BucketManager handles all the bucket level operations.
+ */
+public interface BucketManager {
+  /**
+   * Creates a bucket.
+   * @param bucketInfo - OmBucketInfo for creating bucket.
+   */
+  void createBucket(OmBucketInfo bucketInfo) throws IOException;
+  /**
+   * Returns Bucket Information.
+   * @param volumeName - Name of the Volume.
+   * @param bucketName - Name of the Bucket.
+   */
+  OmBucketInfo getBucketInfo(String volumeName, String bucketName)
+      throws IOException;
+
+  /**
+   * Sets bucket property from args.
+   * @param args - BucketArgs.
+   * @throws IOException
+   */
+  void setBucketProperty(OmBucketArgs args) throws IOException;
+
+  /**
+   * Deletes an existing empty bucket from volume.
+   * @param volumeName - Name of the volume.
+   * @param bucketName - Name of the bucket.
+   * @throws IOException
+   */
+  void deleteBucket(String volumeName, String bucketName) throws IOException;
+
+  /**
+   * Returns a list of buckets represented by {@link OmBucketInfo}
+   * in the given volume.
+   *
+   * @param volumeName
+   *   Required parameter volume name determines buckets in which volume
+   *   to return.
+   * @param startBucket
+   *   Optional start bucket name parameter indicating where to start
+   *   the bucket listing from, this key is excluded from the result.
+   * @param bucketPrefix
+   *   Optional start key parameter, restricting the response to buckets
+   *   that begin with the specified name.
+   * @param maxNumOfBuckets
+   *   The maximum number of buckets to return. It ensures
+   *   the size of the result will not exceed this limit.
+   * @return a list of buckets.
+   * @throws IOException
+   */
+  List<OmBucketInfo> listBuckets(String volumeName,
+                                 String startBucket, String bucketPrefix, int maxNumOfBuckets)
+      throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
new file mode 100644
index 0000000..4bbce81
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/BucketManagerImpl.java
@@ -0,0 +1,315 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.protocol.proto
+    .OzoneManagerProtocolProtos.BucketInfo;
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.util.Time;
+import org.iq80.leveldb.DBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * OM bucket manager.
+ */
+public class BucketManagerImpl implements BucketManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BucketManagerImpl.class);
+
+  /**
+   * OMMetadataManager is used for accessing OM MetadataDB and ReadWriteLock.
+   */
+  private final OMMetadataManager metadataManager;
+
+  /**
+   * Constructs BucketManager.
+   * @param metadataManager
+   */
+  public BucketManagerImpl(OMMetadataManager metadataManager){
+    this.metadataManager = metadataManager;
+  }
+
+  /**
+   * MetadataDB is maintained in MetadataManager and shared between
+   * BucketManager and VolumeManager. (and also by KeyManager)
+   *
+   * BucketManager uses MetadataDB to store bucket level information.
+   *
+   * Keys used in BucketManager for storing data into MetadataDB
+   * for BucketInfo:
+   * {volume/bucket} -> bucketInfo
+   *
+   * Work flow of create bucket:
+   *
+   * -> Check if the Volume exists in metadataDB, if not throw
+   * VolumeNotFoundException.
+   * -> Else check if the Bucket exists in metadataDB, if so throw
+   * BucketExistException
+   * -> Else update MetadataDB with VolumeInfo.
+   */
+
+  /**
+   * Creates a bucket.
+   * @param bucketInfo - OmBucketInfo.
+   */
+  @Override
+  public void createBucket(OmBucketInfo bucketInfo) throws IOException {
+    Preconditions.checkNotNull(bucketInfo);
+    metadataManager.writeLock().lock();
+    String volumeName = bucketInfo.getVolumeName();
+    String bucketName = bucketInfo.getBucketName();
+    try {
+      byte[] volumeKey = metadataManager.getVolumeKey(volumeName);
+      byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+
+      //Check if the volume exists
+      if (metadataManager.get(volumeKey) == null) {
+        LOG.debug("volume: {} not found ", volumeName);
+        throw new OMException("Volume doesn't exist",
+            OMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
+      }
+      //Check if bucket already exists
+      if (metadataManager.get(bucketKey) != null) {
+        LOG.debug("bucket: {} already exists ", bucketName);
+        throw new OMException("Bucket already exist",
+            OMException.ResultCodes.FAILED_BUCKET_ALREADY_EXISTS);
+      }
+
+      OmBucketInfo omBucketInfo = OmBucketInfo.newBuilder()
+          .setVolumeName(bucketInfo.getVolumeName())
+          .setBucketName(bucketInfo.getBucketName())
+          .setAcls(bucketInfo.getAcls())
+          .setStorageType(bucketInfo.getStorageType())
+          .setIsVersionEnabled(bucketInfo.getIsVersionEnabled())
+          .setCreationTime(Time.now())
+          .build();
+      metadataManager.put(bucketKey, omBucketInfo.getProtobuf().toByteArray());
+
+      LOG.debug("created bucket: {} in volume: {}", bucketName, volumeName);
+    } catch (IOException | DBException ex) {
+      if (!(ex instanceof OMException)) {
+        LOG.error("Bucket creation failed for bucket:{} in volume:{}",
+            bucketName, volumeName, ex);
+      }
+      throw ex;
+    } finally {
+      metadataManager.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Returns Bucket Information.
+   *
+   * @param volumeName - Name of the Volume.
+   * @param bucketName - Name of the Bucket.
+   */
+  @Override
+  public OmBucketInfo getBucketInfo(String volumeName, String bucketName)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    metadataManager.readLock().lock();
+    try {
+      byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+      byte[] value = metadataManager.get(bucketKey);
+      if (value == null) {
+        LOG.debug("bucket: {} not found in volume: {}.", bucketName,
+            volumeName);
+        throw new OMException("Bucket not found",
+            OMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
+      }
+      return OmBucketInfo.getFromProtobuf(BucketInfo.parseFrom(value));
+    } catch (IOException | DBException ex) {
+      if (!(ex instanceof OMException)) {
+        LOG.error("Exception while getting bucket info for bucket: {}",
+            bucketName, ex);
+      }
+      throw ex;
+    } finally {
+      metadataManager.readLock().unlock();
+    }
+  }
+
+  /**
+   * Sets bucket property from args.
+   * @param args - BucketArgs.
+   * @throws IOException
+   */
+  @Override
+  public void setBucketProperty(OmBucketArgs args) throws IOException {
+    Preconditions.checkNotNull(args);
+    metadataManager.writeLock().lock();
+    String volumeName = args.getVolumeName();
+    String bucketName = args.getBucketName();
+    try {
+      byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+      //Check if volume exists
+      if(metadataManager.get(metadataManager.getVolumeKey(volumeName)) ==
+          null) {
+        LOG.debug("volume: {} not found ", volumeName);
+        throw new OMException("Volume doesn't exist",
+            OMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
+      }
+      byte[] value = metadataManager.get(bucketKey);
+      //Check if bucket exist
+      if(value == null) {
+        LOG.debug("bucket: {} not found ", bucketName);
+        throw new OMException("Bucket doesn't exist",
+            OMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
+      }
+      OmBucketInfo oldBucketInfo = OmBucketInfo.getFromProtobuf(
+          BucketInfo.parseFrom(value));
+      OmBucketInfo.Builder bucketInfoBuilder = OmBucketInfo.newBuilder();
+      bucketInfoBuilder.setVolumeName(oldBucketInfo.getVolumeName())
+          .setBucketName(oldBucketInfo.getBucketName());
+
+      //Check ACLs to update
+      if(args.getAddAcls() != null || args.getRemoveAcls() != null) {
+        bucketInfoBuilder.setAcls(getUpdatedAclList(oldBucketInfo.getAcls(),
+            args.getRemoveAcls(), args.getAddAcls()));
+        LOG.debug("Updating ACLs for bucket: {} in volume: {}",
+            bucketName, volumeName);
+      } else {
+        bucketInfoBuilder.setAcls(oldBucketInfo.getAcls());
+      }
+
+      //Check StorageType to update
+      StorageType storageType = args.getStorageType();
+      if (storageType != null) {
+        bucketInfoBuilder.setStorageType(storageType);
+        LOG.debug("Updating bucket storage type for bucket: {} in volume: {}",
+            bucketName, volumeName);
+      } else {
+        bucketInfoBuilder.setStorageType(oldBucketInfo.getStorageType());
+      }
+
+      //Check Versioning to update
+      Boolean versioning = args.getIsVersionEnabled();
+      if (versioning != null) {
+        bucketInfoBuilder.setIsVersionEnabled(versioning);
+        LOG.debug("Updating bucket versioning for bucket: {} in volume: {}",
+            bucketName, volumeName);
+      } else {
+        bucketInfoBuilder
+            .setIsVersionEnabled(oldBucketInfo.getIsVersionEnabled());
+      }
+      bucketInfoBuilder.setCreationTime(oldBucketInfo.getCreationTime());
+
+      metadataManager.put(bucketKey,
+          bucketInfoBuilder.build().getProtobuf().toByteArray());
+    } catch (IOException | DBException ex) {
+      if (!(ex instanceof OMException)) {
+        LOG.error("Setting bucket property failed for bucket:{} in volume:{}",
+            bucketName, volumeName, ex);
+      }
+      throw ex;
+    } finally {
+      metadataManager.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Updates the existing ACL list with remove and add ACLs that are passed.
+   * Remove is done before Add.
+   *
+   * @param existingAcls - old ACL list.
+   * @param removeAcls - ACLs to be removed.
+   * @param addAcls - ACLs to be added.
+   * @return updated ACL list.
+   */
+  private List<OzoneAcl> getUpdatedAclList(List<OzoneAcl> existingAcls,
+      List<OzoneAcl> removeAcls, List<OzoneAcl> addAcls) {
+    if(removeAcls != null && !removeAcls.isEmpty()) {
+      existingAcls.removeAll(removeAcls);
+    }
+    if(addAcls != null && !addAcls.isEmpty()) {
+      addAcls.stream().filter(acl -> !existingAcls.contains(acl)).forEach(
+          existingAcls::add);
+    }
+    return existingAcls;
+  }
+
+  /**
+   * Deletes an existing empty bucket from volume.
+   * @param volumeName - Name of the volume.
+   * @param bucketName - Name of the bucket.
+   * @throws IOException
+   */
+  public void deleteBucket(String volumeName, String bucketName)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+    metadataManager.writeLock().lock();
+    try {
+      byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+      //Check if volume exists
+      if (metadataManager.get(metadataManager.getVolumeKey(volumeName))
+          == null) {
+        LOG.debug("volume: {} not found ", volumeName);
+        throw new OMException("Volume doesn't exist",
+            OMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
+      }
+      //Check if bucket exist
+      if (metadataManager.get(bucketKey) == null) {
+        LOG.debug("bucket: {} not found ", bucketName);
+        throw new OMException("Bucket doesn't exist",
+            OMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
+      }
+      //Check if bucket is empty
+      if (!metadataManager.isBucketEmpty(volumeName, bucketName)) {
+        LOG.debug("bucket: {} is not empty ", bucketName);
+        throw new OMException("Bucket is not empty",
+            OMException.ResultCodes.FAILED_BUCKET_NOT_EMPTY);
+      }
+      metadataManager.delete(bucketKey);
+    } catch (IOException ex) {
+      if (!(ex instanceof OMException)) {
+        LOG.error("Delete bucket failed for bucket:{} in volume:{}", bucketName,
+            volumeName, ex);
+      }
+      throw ex;
+    } finally {
+      metadataManager.writeLock().unlock();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<OmBucketInfo> listBuckets(String volumeName,
+                                        String startBucket, String bucketPrefix, int maxNumOfBuckets)
+      throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    metadataManager.readLock().lock();
+    try {
+      return metadataManager.listBuckets(
+          volumeName, startBucket, bucketPrefix, maxNumOfBuckets);
+    } finally {
+      metadataManager.readLock().unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
new file mode 100644
index 0000000..ee23fe0
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.BackgroundService;
+import org.apache.hadoop.utils.BackgroundTask;
+import org.apache.hadoop.utils.BackgroundTaskQueue;
+import org.apache.hadoop.utils.BackgroundTaskResult;
+import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
+
+/**
+ * This is the background service to delete keys.
+ * Scan the metadata of om periodically to get
+ * the keys with prefix "#deleting" and ask scm to
+ * delete metadata accordingly, if scm returns
+ * success for keys, then clean up those keys.
+ */
+public class KeyDeletingService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(KeyDeletingService.class);
+
+  // The thread pool size for key deleting service.
+  private final static int KEY_DELETING_CORE_POOL_SIZE = 2;
+
+  private final ScmBlockLocationProtocol scmClient;
+  private final KeyManager manager;
+  private final int keyLimitPerTask;
+
+  public KeyDeletingService(ScmBlockLocationProtocol scmClient,
+      KeyManager manager, long serviceInterval,
+      long serviceTimeout, Configuration conf) {
+    super("KeyDeletingService", serviceInterval, TimeUnit.MILLISECONDS,
+        KEY_DELETING_CORE_POOL_SIZE, serviceTimeout);
+    this.scmClient = scmClient;
+    this.manager = manager;
+    this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK,
+        OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new KeyDeletingTask());
+    return queue;
+  }
+
+  /**
+   * A key deleting task scans OM DB and looking for a certain number
+   * of pending-deletion keys, sends these keys along with their associated
+   * blocks to SCM for deletion. Once SCM confirms keys are deleted (once
+   * SCM persisted the blocks info in its deletedBlockLog), it removes
+   * these keys from the DB.
+   */
+  private class KeyDeletingTask implements
+      BackgroundTask<BackgroundTaskResult> {
+
+    @Override
+    public int getPriority() {
+      return 0;
+    }
+
+    @Override
+    public BackgroundTaskResult call() throws Exception {
+      try {
+        long startTime = Time.monotonicNow();
+        List<BlockGroup> keyBlocksList = manager
+            .getPendingDeletionKeys(keyLimitPerTask);
+        if (keyBlocksList.size() > 0) {
+          LOG.info("Found {} to-delete keys in OM", keyBlocksList.size());
+          List<DeleteBlockGroupResult> results =
+              scmClient.deleteKeyBlocks(keyBlocksList);
+          for (DeleteBlockGroupResult result : results) {
+            if (result.isSuccess()) {
+              try {
+                // Purge key from OM DB.
+                manager.deletePendingDeletionKey(result.getObjectKey());
+                LOG.debug("Key {} deleted from OM DB", result.getObjectKey());
+              } catch (IOException e) {
+                // if a pending deletion key is failed to delete,
+                // print a warning here and retain it in this state,
+                // so that it can be attempt to delete next time.
+                LOG.warn("Failed to delete pending-deletion key {}",
+                    result.getObjectKey(), e);
+              }
+            } else {
+              // Key deletion failed, retry in next interval.
+              LOG.warn("Key {} deletion failed because some of the blocks"
+                  + " were failed to delete, failed blocks: {}",
+                  result.getObjectKey(),
+                  StringUtils.join(",", result.getFailedBlocks()));
+            }
+          }
+
+          if (!results.isEmpty()) {
+            LOG.info("Number of key deleted from OM DB: {},"
+                + " task elapsed time: {}ms",
+                results.size(), Time.monotonicNow() - startTime);
+          }
+
+          return results::size;
+        } else {
+          LOG.debug("No pending deletion key found in OM");
+        }
+      } catch (IOException e) {
+        LOG.error("Unable to get pending deletion keys, retry in"
+            + " next interval", e);
+      }
+      return EmptyTaskResult.newResult();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
new file mode 100644
index 0000000..226c07d
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Handles key level commands.
+ */
+public interface KeyManager {
+
+  /**
+   * Start key manager.
+   */
+  void start();
+
+  /**
+   * Stop key manager.
+   */
+  void stop() throws IOException;
+
+  /**
+   * After calling commit, the key will be made visible. There can be multiple
+   * open key writes in parallel (identified by client id). The most recently
+   * committed one will be the one visible.
+   *
+   * @param args the key to commit.
+   * @param clientID the client that is committing.
+   * @throws IOException
+   */
+  void commitKey(OmKeyArgs args, int clientID) throws IOException;
+
+  /**
+   * A client calls this on an open key, to request to allocate a new block,
+   * and appended to the tail of current block list of the open client.
+   *
+   * @param args the key to append
+   * @param clientID the client requesting block.
+   * @return the reference to the new block.
+   * @throws IOException
+   */
+  OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID)
+      throws IOException;
+  /**
+   * Given the args of a key to put, write an open key entry to meta data.
+   *
+   * In case that the container creation or key write failed on
+   * DistributedStorageHandler, this key's metadata will still stay in OM.
+   * TODO garbage collect the open keys that never get closed
+   *
+   * @param args the args of the key provided by client.
+   * @return a OpenKeySession instance client uses to talk to container.
+   * @throws Exception
+   */
+  OpenKeySession openKey(OmKeyArgs args) throws IOException;
+
+  /**
+   * Look up an existing key. Return the info of the key to client side, which
+   * DistributedStorageHandler will use to access the data on datanode.
+   *
+   * @param args the args of the key provided by client.
+   * @return a OmKeyInfo instance client uses to talk to container.
+   * @throws IOException
+   */
+  OmKeyInfo lookupKey(OmKeyArgs args) throws IOException;
+
+  /**
+   * Renames an existing key within a bucket.
+   *
+   * @param args the args of the key provided by client.
+   * @param toKeyName New name to be used for the key
+   * @throws IOException if specified key doesn't exist or
+   * some other I/O errors while renaming the key.
+   */
+  void renameKey(OmKeyArgs args, String toKeyName) throws IOException;
+
+  /**
+   * Deletes an object by an object key. The key will be immediately removed
+   * from OM namespace and become invisible to clients. The object data
+   * will be removed in async manner that might retain for some time.
+   *
+   * @param args the args of the key provided by client.
+   * @throws IOException if specified key doesn't exist or
+   * some other I/O errors while deleting an object.
+   */
+  void deleteKey(OmKeyArgs args) throws IOException;
+
+  /**
+   * Returns a list of keys represented by {@link OmKeyInfo}
+   * in the given bucket.
+   *
+   * @param volumeName
+   *   the name of the volume.
+   * @param bucketName
+   *   the name of the bucket.
+   * @param startKey
+   *   the start key name, only the keys whose name is
+   *   after this value will be included in the result.
+   *   This key is excluded from the result.
+   * @param keyPrefix
+   *   key name prefix, only the keys whose name has
+   *   this prefix will be included in the result.
+   * @param maxKeys
+   *   the maximum number of keys to return. It ensures
+   *   the size of the result will not exceed this limit.
+   * @return a list of keys.
+   * @throws IOException
+   */
+  List<OmKeyInfo> listKeys(String volumeName,
+                           String bucketName, String startKey, String keyPrefix, int maxKeys)
+      throws IOException;
+
+  /**
+   * Returns a list of pending deletion key info that ups to the given count.
+   * Each entry is a {@link BlockGroup}, which contains the info about the
+   * key name and all its associated block IDs. A pending deletion key is
+   * stored with #deleting# prefix in OM DB.
+   *
+   * @param count max number of keys to return.
+   * @return a list of {@link BlockGroup} representing keys and blocks.
+   * @throws IOException
+   */
+  List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;
+
+  /**
+   * Deletes a pending deletion key by its name. This is often called when
+   * key can be safely deleted from this layer. Once called, all footprints
+   * of the key will be purged from OM DB.
+   *
+   * @param objectKeyName object key name with #deleting# prefix.
+   * @throws IOException if specified key doesn't exist or other I/O errors.
+   */
+  void deletePendingDeletionKey(String objectKeyName) throws IOException;
+
+  /**
+   * Returns a list of all still open key info. Which contains the info about
+   * the key name and all its associated block IDs. A pending open key has
+   * prefix #open# in OM DB.
+   *
+   * @return a list of {@link BlockGroup} representing keys and blocks.
+   * @throws IOException
+   */
+  List<BlockGroup> getExpiredOpenKeys() throws IOException;
+
+  /**
+   * Deletes a expired open key by its name. Called when a hanging key has been
+   * lingering for too long. Once called, the open key entries gets removed
+   * from OM mdata data.
+   *
+   * @param objectKeyName object key name with #open# prefix.
+   * @throws IOException if specified key doesn't exist or other I/O errors.
+   */
+  void deleteExpiredOpenKey(String objectKeyName) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
new file mode 100644
index 0000000..ba92a29
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -0,0 +1,566 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.protocol.proto
+    .OzoneManagerProtocolProtos.KeyInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.BackgroundService;
+import org.apache.hadoop.utils.BatchOperation;
+import org.iq80.leveldb.DBException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone
+    .OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT;
+import static org.apache.hadoop.ozone
+    .OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
+import static org.apache.hadoop.ozone
+    .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone
+    .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone
+    .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
+import static org.apache.hadoop.ozone
+    .OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.ozone
+    .OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE;
+import static org.apache.hadoop.ozone
+    .OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS_DEFAULT;
+import static org.apache.hadoop.ozone
+    .OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
+import static org.apache.hadoop.ozone
+    .OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB;
+import org.apache.hadoop.hdds.protocol
+    .proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol
+    .proto.HddsProtos.ReplicationFactor;
+
+
+/**
+ * Implementation of keyManager.
+ */
+public class KeyManagerImpl implements KeyManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(KeyManagerImpl.class);
+
+  /**
+   * A SCM block client, used to talk to SCM to allocate block during putKey.
+   */
+  private final ScmBlockLocationProtocol scmBlockClient;
+  private final OMMetadataManager metadataManager;
+  private final long scmBlockSize;
+  private final boolean useRatis;
+  private final BackgroundService keyDeletingService;
+  private final BackgroundService openKeyCleanupService;
+
+  private final long preallocateMax;
+  private final Random random;
+  private final String omId;
+
+  public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
+                        OMMetadataManager metadataManager, OzoneConfiguration conf,
+                        String omId) {
+    this.scmBlockClient = scmBlockClient;
+    this.metadataManager = metadataManager;
+    this.scmBlockSize = conf.getLong(OZONE_SCM_BLOCK_SIZE_IN_MB,
+        OZONE_SCM_BLOCK_SIZE_DEFAULT) * OzoneConsts.MB;
+    this.useRatis = conf.getBoolean(DFS_CONTAINER_RATIS_ENABLED_KEY,
+        DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
+    long  blockDeleteInterval = conf.getTimeDuration(
+        OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
+        OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    long serviceTimeout = conf.getTimeDuration(
+        OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
+        OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    this.preallocateMax = conf.getLong(
+        OZONE_KEY_PREALLOCATION_MAXSIZE,
+        OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT);
+    keyDeletingService = new KeyDeletingService(
+        scmBlockClient, this, blockDeleteInterval, serviceTimeout, conf);
+    int openkeyCheckInterval = conf.getInt(
+        OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS,
+        OZONE_OPEN_KEY_CLEANUP_SERVICE_INTERVAL_SECONDS_DEFAULT);
+    openKeyCleanupService = new OpenKeyCleanupService(
+        scmBlockClient, this, openkeyCheckInterval, serviceTimeout);
+    random = new Random();
+    this.omId = omId;
+  }
+
+  @VisibleForTesting
+  public BackgroundService getOpenKeyCleanupService() {
+    return openKeyCleanupService;
+  }
+
+  @Override
+  public void start() {
+    keyDeletingService.start();
+    openKeyCleanupService.start();
+  }
+
+  @Override
+  public void stop() throws IOException {
+    keyDeletingService.shutdown();
+    openKeyCleanupService.shutdown();
+  }
+
+  private void validateBucket(String volumeName, String bucketName)
+      throws IOException {
+    byte[] volumeKey = metadataManager.getVolumeKey(volumeName);
+    byte[] bucketKey = metadataManager.getBucketKey(volumeName, bucketName);
+
+    //Check if the volume exists
+    if(metadataManager.get(volumeKey) == null) {
+      LOG.error("volume not found: {}", volumeName);
+      throw new OMException("Volume not found",
+          OMException.ResultCodes.FAILED_VOLUME_NOT_FOUND);
+    }
+    //Check if bucket already exists
+    if(metadataManager.get(bucketKey) == null) {
+      LOG.error("bucket not found: {}/{} ", volumeName, bucketName);
+      throw new OMException("Bucket not found",
+          OMException.ResultCodes.FAILED_BUCKET_NOT_FOUND);
+    }
+  }
+
+  @Override
+  public OmKeyLocationInfo allocateBlock(OmKeyArgs args, int clientID)
+      throws IOException {
+    Preconditions.checkNotNull(args);
+    metadataManager.writeLock().lock();
+    String volumeName = args.getVolumeName();
+    String bucketName = args.getBucketName();
+    String keyName = args.getKeyName();
+
+    try {
+      validateBucket(volumeName, bucketName);
+      String objectKey = metadataManager.getKeyWithDBPrefix(
+          volumeName, bucketName, keyName);
+      byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, clientID);
+      byte[] keyData = metadataManager.get(openKey);
+      if (keyData == null) {
+        LOG.error("Allocate block for a key not in open status in meta store " +
+            objectKey + " with ID " + clientID);
+        throw new OMException("Open Key not found",
+            OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
+      }
+      OmKeyInfo keyInfo =
+          OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(keyData));
+      AllocatedBlock allocatedBlock =
+          scmBlockClient.allocateBlock(scmBlockSize, keyInfo.getType(),
+              keyInfo.getFactor(), omId);
+      OmKeyLocationInfo info = new OmKeyLocationInfo.Builder()
+          .setBlockID(allocatedBlock.getBlockID())
+          .setShouldCreateContainer(allocatedBlock.getCreateContainer())
+          .setLength(scmBlockSize)
+          .setOffset(0)
+          .build();
+      // current version not committed, so new blocks coming now are added to
+      // the same version
+      keyInfo.appendNewBlocks(Collections.singletonList(info));
+      keyInfo.updateModifcationTime();
+      metadataManager.put(openKey, keyInfo.getProtobuf().toByteArray());
+      return info;
+    } finally {
+      metadataManager.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public OpenKeySession openKey(OmKeyArgs args) throws IOException {
+    Preconditions.checkNotNull(args);
+    metadataManager.writeLock().lock();
+    String volumeName = args.getVolumeName();
+    String bucketName = args.getBucketName();
+    String keyName = args.getKeyName();
+    ReplicationFactor factor = args.getFactor();
+    ReplicationType type = args.getType();
+
+    // If user does not specify a replication strategy or
+    // replication factor, OM will use defaults.
+    if(factor == null) {
+      factor = useRatis ? ReplicationFactor.THREE: ReplicationFactor.ONE;
+    }
+
+    if(type == null) {
+      type = useRatis ? ReplicationType.RATIS : ReplicationType.STAND_ALONE;
+    }
+
+    try {
+      validateBucket(volumeName, bucketName);
+      long requestedSize = Math.min(preallocateMax, args.getDataSize());
+      List<OmKeyLocationInfo> locations = new ArrayList<>();
+      String objectKey = metadataManager.getKeyWithDBPrefix(
+          volumeName, bucketName, keyName);
+      // requested size is not required but more like a optimization:
+      // SCM looks at the requested, if it 0, no block will be allocated at
+      // the point, if client needs more blocks, client can always call
+      // allocateBlock. But if requested size is not 0, OM will preallocate
+      // some blocks and piggyback to client, to save RPC calls.
+      while (requestedSize > 0) {
+        long allocateSize = Math.min(scmBlockSize, requestedSize);
+        AllocatedBlock allocatedBlock =
+            scmBlockClient.allocateBlock(allocateSize, type, factor, omId);
+        OmKeyLocationInfo subKeyInfo = new OmKeyLocationInfo.Builder()
+            .setBlockID(allocatedBlock.getBlockID())
+            .setShouldCreateContainer(allocatedBlock.getCreateContainer())
+            .setLength(allocateSize)
+            .setOffset(0)
+            .build();
+        locations.add(subKeyInfo);
+        requestedSize -= allocateSize;
+      }
+      // NOTE size of a key is not a hard limit on anything, it is a value that
+      // client should expect, in terms of current size of key. If client sets a
+      // value, then this value is used, otherwise, we allocate a single block
+      // which is the current size, if read by the client.
+      long size = args.getDataSize() >= 0 ? args.getDataSize() : scmBlockSize;
+      byte[] keyKey = metadataManager.getDBKeyBytes(
+          volumeName, bucketName, keyName);
+      byte[] value = metadataManager.get(keyKey);
+      OmKeyInfo keyInfo;
+      long openVersion;
+      if (value != null) {
+        // the key already exist, the new blocks will be added as new version
+        keyInfo = OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(value));
+        // when locations.size = 0, the new version will have identical blocks
+        // as its previous version
+        openVersion = keyInfo.addNewVersion(locations);
+        keyInfo.setDataSize(size + keyInfo.getDataSize());
+      } else {
+        // the key does not exist, create a new object, the new blocks are the
+        // version 0
+        long currentTime = Time.now();
+        keyInfo = new OmKeyInfo.Builder()
+            .setVolumeName(args.getVolumeName())
+            .setBucketName(args.getBucketName())
+            .setKeyName(args.getKeyName())
+            .setOmKeyLocationInfos(Collections.singletonList(
+                new OmKeyLocationInfoGroup(0, locations)))
+            .setCreationTime(currentTime)
+            .setModificationTime(currentTime)
+            .setDataSize(size)
+            .setReplicationType(type)
+            .setReplicationFactor(factor)
+            .build();
+        openVersion = 0;
+      }
+      // Generate a random ID which is not already in meta db.
+      int id = -1;
+      // in general this should finish in a couple times at most. putting some
+      // arbitrary large number here to avoid dead loop.
+      for (int j = 0; j < 10000; j++) {
+        id = random.nextInt();
+        byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, id);
+        if (metadataManager.get(openKey) == null) {
+          metadataManager.put(openKey, keyInfo.getProtobuf().toByteArray());
+          break;
+        }
+      }
+      if (id == -1) {
+        throw new IOException("Failed to find a usable id for " + objectKey);
+      }
+      LOG.debug("Key {} allocated in volume {} bucket {}",
+          keyName, volumeName, bucketName);
+      return new OpenKeySession(id, keyInfo, openVersion);
+    } catch (OMException e) {
+      throw e;
+    } catch (IOException ex) {
+      if (!(ex instanceof OMException)) {
+        LOG.error("Key open failed for volume:{} bucket:{} key:{}",
+            volumeName, bucketName, keyName, ex);
+      }
+      throw new OMException(ex.getMessage(),
+          OMException.ResultCodes.FAILED_KEY_ALLOCATION);
+    } finally {
+      metadataManager.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public void commitKey(OmKeyArgs args, int clientID) throws IOException {
+    Preconditions.checkNotNull(args);
+    metadataManager.writeLock().lock();
+    String volumeName = args.getVolumeName();
+    String bucketName = args.getBucketName();
+    String keyName = args.getKeyName();
+    try {
+      validateBucket(volumeName, bucketName);
+      String objectKey = metadataManager.getKeyWithDBPrefix(
+          volumeName, bucketName, keyName);
+      byte[] objectKeyBytes = metadataManager.getDBKeyBytes(volumeName,
+          bucketName, keyName);
+      byte[] openKey = metadataManager.getOpenKeyNameBytes(objectKey, clientID);
+      byte[] openKeyData = metadataManager.get(openKey);
+      if (openKeyData == null) {
+        throw new OMException("Commit a key without corresponding entry " +
+            DFSUtil.bytes2String(openKey), ResultCodes.FAILED_KEY_NOT_FOUND);
+      }
+      OmKeyInfo keyInfo =
+          OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(openKeyData));
+      keyInfo.setDataSize(args.getDataSize());
+      keyInfo.setModificationTime(Time.now());
+      BatchOperation batch = new BatchOperation();
+      batch.delete(openKey);
+      batch.put(objectKeyBytes, keyInfo.getProtobuf().toByteArray());
+      metadataManager.writeBatch(batch);
+    } catch (OMException e) {
+      throw e;
+    } catch (IOException ex) {
+      LOG.error("Key commit failed for volume:{} bucket:{} key:{}",
+          volumeName, bucketName, keyName, ex);
+      throw new OMException(ex.getMessage(),
+          OMException.ResultCodes.FAILED_KEY_ALLOCATION);
+    } finally {
+      metadataManager.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
+    Preconditions.checkNotNull(args);
+    metadataManager.writeLock().lock();
+    String volumeName = args.getVolumeName();
+    String bucketName = args.getBucketName();
+    String keyName = args.getKeyName();
+    try {
+      byte[] keyKey = metadataManager.getDBKeyBytes(
+          volumeName, bucketName, keyName);
+      byte[] value = metadataManager.get(keyKey);
+      if (value == null) {
+        LOG.debug("volume:{} bucket:{} Key:{} not found",
+            volumeName, bucketName, keyName);
+        throw new OMException("Key not found",
+            OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
+      }
+      return OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(value));
+    } catch (DBException ex) {
+      LOG.error("Get key failed for volume:{} bucket:{} key:{}",
+          volumeName, bucketName, keyName, ex);
+      throw new OMException(ex.getMessage(),
+          OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
+    } finally {
+      metadataManager.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public void renameKey(OmKeyArgs args, String toKeyName) throws IOException {
+    Preconditions.checkNotNull(args);
+    Preconditions.checkNotNull(toKeyName);
+    String volumeName = args.getVolumeName();
+    String bucketName = args.getBucketName();
+    String fromKeyName = args.getKeyName();
+    if (toKeyName.length() == 0 || fromKeyName.length() == 0) {
+      LOG.error("Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}.",
+          volumeName, bucketName, fromKeyName, toKeyName);
+      throw new OMException("Key name is empty",
+          ResultCodes.FAILED_INVALID_KEY_NAME);
+    }
+
+    metadataManager.writeLock().lock();
+    try {
+      // fromKeyName should exist
+      byte[] fromKey = metadataManager.getDBKeyBytes(
+          volumeName, bucketName, fromKeyName);
+      byte[] fromKeyValue = metadataManager.get(fromKey);
+      if (fromKeyValue == null) {
+        // TODO: Add support for renaming open key
+        LOG.error(
+            "Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}. "
+                + "Key: {} not found.", volumeName, bucketName, fromKeyName,
+            toKeyName, fromKeyName);
+        throw new OMException("Key not found",
+            OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
+      }
+
+      // toKeyName should not exist
+      byte[] toKey =
+          metadataManager.getDBKeyBytes(volumeName, bucketName, toKeyName);
+      byte[] toKeyValue = metadataManager.get(toKey);
+      if (toKeyValue != null) {
+        LOG.error(
+            "Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}. "
+                + "Key: {} already exists.", volumeName, bucketName,
+            fromKeyName, toKeyName, toKeyName);
+        throw new OMException("Key not found",
+            OMException.ResultCodes.FAILED_KEY_ALREADY_EXISTS);
+      }
+
+      if (fromKeyName.equals(toKeyName)) {
+        return;
+      }
+
+      OmKeyInfo newKeyInfo =
+          OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(fromKeyValue));
+      newKeyInfo.setKeyName(toKeyName);
+      newKeyInfo.updateModifcationTime();
+      BatchOperation batch = new BatchOperation();
+      batch.delete(fromKey);
+      batch.put(toKey, newKeyInfo.getProtobuf().toByteArray());
+      metadataManager.writeBatch(batch);
+    } catch (DBException ex) {
+      LOG.error("Rename key failed for volume:{} bucket:{} fromKey:{} toKey:{}.",
+          volumeName, bucketName, fromKeyName, toKeyName, ex);
+      throw new OMException(ex.getMessage(),
+          ResultCodes.FAILED_KEY_RENAME);
+    } finally {
+      metadataManager.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public void deleteKey(OmKeyArgs args) throws IOException {
+    Preconditions.checkNotNull(args);
+    metadataManager.writeLock().lock();
+    String volumeName = args.getVolumeName();
+    String bucketName = args.getBucketName();
+    String keyName = args.getKeyName();
+    try {
+      byte[] objectKey = metadataManager.getDBKeyBytes(
+          volumeName, bucketName, keyName);
+      byte[] objectValue = metadataManager.get(objectKey);
+      if (objectValue == null) {
+        throw new OMException("Key not found",
+            OMException.ResultCodes.FAILED_KEY_NOT_FOUND);
+      }
+      byte[] deletingKey = metadataManager.getDeletedKeyName(objectKey);
+      BatchOperation batch = new BatchOperation();
+      batch.put(deletingKey, objectValue);
+      batch.delete(objectKey);
+      metadataManager.writeBatch(batch);
+    } catch (DBException ex) {
+      LOG.error(String.format("Delete key failed for volume:%s "
+          + "bucket:%s key:%s", volumeName, bucketName, keyName), ex);
+      throw new OMException(ex.getMessage(), ex,
+          ResultCodes.FAILED_KEY_DELETION);
+    } finally {
+      metadataManager.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
+                                  String startKey, String keyPrefix, int maxKeys) throws IOException {
+    Preconditions.checkNotNull(volumeName);
+    Preconditions.checkNotNull(bucketName);
+
+    metadataManager.readLock().lock();
+    try {
+      return metadataManager.listKeys(volumeName, bucketName,
+          startKey, keyPrefix, maxKeys);
+    } finally {
+      metadataManager.readLock().unlock();
+    }
+  }
+
+  @Override
+  public List<BlockGroup> getPendingDeletionKeys(final int count)
+      throws IOException {
+    metadataManager.readLock().lock();
+    try {
+      return metadataManager.getPendingDeletionKeys(count);
+    } finally {
+      metadataManager.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void deletePendingDeletionKey(String objectKeyName)
+      throws IOException{
+    Preconditions.checkNotNull(objectKeyName);
+    if (!objectKeyName.startsWith(OzoneConsts.DELETING_KEY_PREFIX)) {
+      throw new IllegalArgumentException("Invalid key name,"
+          + " the name should be the key name with deleting prefix");
+    }
+
+    // Simply removes the entry from OM DB.
+    metadataManager.writeLock().lock();
+    try {
+      byte[] pendingDelKey = DFSUtil.string2Bytes(objectKeyName);
+      byte[] delKeyValue = metadataManager.get(pendingDelKey);
+      if (delKeyValue == null) {
+        throw new IOException("Failed to delete key " + objectKeyName
+            + " because it is not found in DB");
+      }
+      metadataManager.delete(pendingDelKey);
+    } finally {
+      metadataManager.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public List<BlockGroup> getExpiredOpenKeys() throws IOException {
+    metadataManager.readLock().lock();
+    try {
+      return metadataManager.getExpiredOpenKeys();
+    } finally {
+      metadataManager.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void deleteExpiredOpenKey(String objectKeyName) throws IOException {
+    Preconditions.checkNotNull(objectKeyName);
+    if (!objectKeyName.startsWith(OzoneConsts.OPEN_KEY_PREFIX)) {
+      throw new IllegalArgumentException("Invalid key name,"
+          + " the name should be the key name with open key prefix");
+    }
+
+    // Simply removes the entry from OM DB.
+    metadataManager.writeLock().lock();
+    try {
+      byte[] openKey = DFSUtil.string2Bytes(objectKeyName);
+      byte[] delKeyValue = metadataManager.get(openKey);
+      if (delKeyValue == null) {
+        throw new IOException("Failed to delete key " + objectKeyName
+            + " because it is not found in DB");
+      }
+      metadataManager.delete(openKey);
+    } finally {
+      metadataManager.writeLock().unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMXBean.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMXBean.java
new file mode 100644
index 0000000..3ab9f47
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMXBean.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.server.ServiceRuntimeInfo;
+
+/**
+ * This is the JMX management interface for OM information.
+ */
+@InterfaceAudience.Private
+public interface OMMXBean extends ServiceRuntimeInfo {
+
+  String getRpcPort();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/061b1685/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
new file mode 100644
index 0000000..f2e78e6
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataStore;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * OM metadata manager interface.
+ */
+public interface OMMetadataManager {
+  /**
+   * Start metadata manager.
+   */
+  void start();
+
+  /**
+   * Stop metadata manager.
+   */
+  void stop() throws IOException;
+
+  /**
+   * Get metadata store.
+   * @return metadata store.
+   */
+  @VisibleForTesting
+  MetadataStore getStore();
+
+  /**
+   * Returns the read lock used on Metadata DB.
+   * @return readLock
+   */
+  Lock readLock();
+
+  /**
+   * Returns the write lock used on Metadata DB.
+   * @return writeLock
+   */
+  Lock writeLock();
+
+  /**
+   * Returns the value associated with this key.
+   * @param key - key
+   * @return value
+   */
+  byte[] get(byte[] key) throws IOException;
+
+  /**
+   * Puts a Key into Metadata DB.
+   * @param key   - key
+   * @param value - value
+   */
+  void put(byte[] key, byte[] value) throws IOException;
+
+  /**
+   * Deletes a Key from Metadata DB.
+   * @param key   - key
+   */
+  void delete(byte[] key) throws IOException;
+
+  /**
+   * Atomic write a batch of operations.
+   * @param batch
+   * @throws IOException
+   */
+  void writeBatch(BatchOperation batch) throws IOException;
+
+  /**
+   * Given a volume return the corresponding DB key.
+   * @param volume - Volume name
+   */
+  byte[] getVolumeKey(String volume);
+
+  /**
+   * Given a user return the corresponding DB key.
+   * @param user - User name
+   */
+  byte[] getUserKey(String user);
+
+  /**
+   * Given a volume and bucket, return the corresponding DB key.
+   * @param volume - User name
+   * @param bucket - Bucket name
+   */
+  byte[] getBucketKey(String volume, String bucket);
+
+  /**
+   * Given a volume, bucket and a key, return the corresponding DB key.
+   * @param volume - volume name
+   * @param bucket - bucket name
+   * @param key - key name
+   * @return bytes of DB key.
+   */
+  byte[] getDBKeyBytes(String volume, String bucket, String key);
+
+  /**
+   * Returns the DB key name of a deleted key in OM metadata store.
+   * The name for a deleted key has prefix #deleting# followed by
+   * the actual key name.
+   * @param keyName - key name
+   * @return bytes of DB key.
+   */
+  byte[] getDeletedKeyName(byte[] keyName);
+
+  /**
+   * Returns the DB key name of a open key in OM metadata store.
+   * Should be #open# prefix followed by actual key name.
+   * @param keyName - key name
+   * @param id - the id for this open
+   * @return bytes of DB key.
+   */
+  byte[] getOpenKeyNameBytes(String keyName, int id);
+
+  /**
+   * Returns the full name of a key given volume name, bucket name and key name.
+   * Generally done by padding certain delimiters.
+   *
+   * @param volumeName - volume name
+   * @param bucketName - bucket name
+   * @param keyName - key name
+   * @return the full key name.
+   */
+  String getKeyWithDBPrefix(String volumeName, String bucketName,
+      String keyName);
+
+  /**
+   * Given a volume, check if it is empty,
+   * i.e there are no buckets inside it.
+   * @param volume - Volume name
+   */
+  boolean isVolumeEmpty(String volume) throws IOException;
+
+  /**
+   * Given a volume/bucket, check if it is empty,
+   * i.e there are no keys inside it.
+   * @param volume - Volume name
+   * @param  bucket - Bucket name
+   * @return true if the bucket is empty
+   */
+  boolean isBucketEmpty(String volume, String bucket) throws IOException;
+
+  /**
+   * Returns a list of buckets represented by {@link OmBucketInfo}
+   * in the given volume.
+   *
+   * @param volumeName
+   *   the name of the volume. This argument is required,
+   *   this method returns buckets in this given volume.
+   * @param startBucket
+   *   the start bucket name. Only the buckets whose name is
+   *   after this value will be included in the result.
+   *   This key is excluded from the result.
+   * @param bucketPrefix
+   *   bucket name prefix. Only the buckets whose name has
+   *   this prefix will be included in the result.
+   * @param maxNumOfBuckets
+   *   the maximum number of buckets to return. It ensures
+   *   the size of the result will not exceed this limit.
+   * @return a list of buckets.
+   * @throws IOException
+   */
+  List<OmBucketInfo> listBuckets(String volumeName, String startBucket,
+                                 String bucketPrefix, int maxNumOfBuckets) throws IOException;
+
+  /**
+   * Returns a list of keys represented by {@link OmKeyInfo}
+   * in the given bucket.
+   *
+   * @param volumeName
+   *   the name of the volume.
+   * @param bucketName
+   *   the name of the bucket.
+   * @param startKey
+   *   the start key name, only the keys whose name is
+   *   after this value will be included in the result.
+   *   This key is excluded from the result.
+   * @param keyPrefix
+   *   key name prefix, only the keys whose name has
+   *   this prefix will be included in the result.
+   * @param maxKeys
+   *   the maximum number of keys to return. It ensures
+   *   the size of the result will not exceed this limit.
+   * @return a list of keys.
+   * @throws IOException
+   */
+  List<OmKeyInfo> listKeys(String volumeName,
+                           String bucketName, String startKey, String keyPrefix, int maxKeys)
+      throws IOException;
+
+  /**
+   * Returns a list of volumes owned by a given user; if user is null,
+   * returns all volumes.
+   *
+   * @param userName
+   *   volume owner
+   * @param prefix
+   *   the volume prefix used to filter the listing result.
+   * @param startKey
+   *   the start volume name determines where to start listing from,
+   *   this key is excluded from the result.
+   * @param maxKeys
+   *   the maximum number of volumes to return.
+   * @return a list of {@link OmVolumeArgs}
+   * @throws IOException
+   */
+  List<OmVolumeArgs> listVolumes(String userName, String prefix,
+                                 String startKey, int maxKeys) throws IOException;
+
+  /**
+   * Returns a list of pending deletion key info that ups to the given count.
+   * Each entry is a {@link BlockGroup}, which contains the info about the
+   * key name and all its associated block IDs. A pending deletion key is
+   * stored with #deleting# prefix in OM DB.
+   *
+   * @param count max number of keys to return.
+   * @return a list of {@link BlockGroup} represent keys and blocks.
+   * @throws IOException
+   */
+  List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;
+
+  /**
+   * Returns a list of all still open key info. Which contains the info about
+   * the key name and all its associated block IDs. A pending open key has
+   * prefix #open# in OM DB.
+   *
+   * @return a list of {@link BlockGroup} representing keys and blocks.
+   * @throws IOException
+   */
+  List<BlockGroup> getExpiredOpenKeys() throws IOException;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org