You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2018/09/05 13:05:53 UTC
hadoop git commit: HDDS-358. Use DBStore and TableStore for
DeleteKeyService. Contributed by Anu Engineer
Repository: hadoop
Updated Branches:
refs/heads/trunk dffb7bfe6 -> df0d61e3a
HDDS-358. Use DBStore and TableStore for DeleteKeyService. Contributed by Anu Engineer
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/df0d61e3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/df0d61e3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/df0d61e3
Branch: refs/heads/trunk
Commit: df0d61e3a07a958fc6d71a910d928c5639011cd7
Parents: dffb7bf
Author: Nanda kumar <na...@apache.org>
Authored: Wed Sep 5 18:35:11 2018 +0530
Committer: Nanda kumar <na...@apache.org>
Committed: Wed Sep 5 18:35:33 2018 +0530
----------------------------------------------------------------------
.../hadoop/ozone/om/helpers/OmKeyArgs.java | 10 ++
.../hadoop/ozone/om/KeyDeletingService.java | 148 +++++++++------
.../org/apache/hadoop/ozone/om/KeyManager.java | 24 +--
.../apache/hadoop/ozone/om/KeyManagerImpl.java | 40 ++++-
.../hadoop/ozone/om/OmMetadataManagerImpl.java | 48 +++--
.../ozone/om/ScmBlockLocationTestIngClient.java | 178 +++++++++++++++++++
.../hadoop/ozone/om/TestKeyDeletingService.java | 164 +++++++++++++++++
7 files changed, 516 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0d61e3/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
index d8d41d5..e56ad7f 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
@@ -16,11 +16,13 @@
* limitations under the License.
*/
package org.apache.hadoop.ozone.om.helpers;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.audit.Auditable;
+import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -102,6 +104,14 @@ public final class OmKeyArgs implements Auditable {
return auditMap;
}
+ @VisibleForTesting
+ public void addLocationInfo(OmKeyLocationInfo locationInfo) {
+ if (this.locationInfoList == null) {
+ locationInfoList = new ArrayList<>();
+ }
+ locationInfoList.add(locationInfo);
+ }
+
/**
* Builder class of OmKeyArgs.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0d61e3/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
index ee23fe0..41a876b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
@@ -1,52 +1,54 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
*/
package org.apache.hadoop.ozone.om;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
-import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.utils.BackgroundService;
import org.apache.hadoop.utils.BackgroundTask;
import org.apache.hadoop.utils.BackgroundTaskQueue;
import org.apache.hadoop.utils.BackgroundTaskResult;
import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.utils.db.Table;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
/**
- * This is the background service to delete keys.
- * Scan the metadata of om periodically to get
- * the keys with prefix "#deleting" and ask scm to
- * delete metadata accordingly, if scm returns
- * success for keys, then clean up those keys.
+ * This is the background service to delete keys. Scan the metadata of om
+ * periodically to get the keys from DeletedTable and ask scm to delete
+ * metadata accordingly, if scm returns success for keys, then clean up those
+ * keys.
*/
public class KeyDeletingService extends BackgroundService {
-
private static final Logger LOG =
LoggerFactory.getLogger(KeyDeletingService.class);
@@ -56,6 +58,8 @@ public class KeyDeletingService extends BackgroundService {
private final ScmBlockLocationProtocol scmClient;
private final KeyManager manager;
private final int keyLimitPerTask;
+ private final AtomicLong deletedKeyCount;
+ private final AtomicLong runCount;
public KeyDeletingService(ScmBlockLocationProtocol scmClient,
KeyManager manager, long serviceInterval,
@@ -66,6 +70,28 @@ public class KeyDeletingService extends BackgroundService {
this.manager = manager;
this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK,
OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
+ this.deletedKeyCount = new AtomicLong(0);
+ this.runCount = new AtomicLong(0);
+ }
+
+ /**
+ * Returns the number of times this Background service has run.
+ *
+ * @return Long, run count.
+ */
+ @VisibleForTesting
+ public AtomicLong getRunCount() {
+ return runCount;
+ }
+
+ /**
+ * Returns the number of keys deleted by the background service.
+ *
+ * @return Long count.
+ */
+ @VisibleForTesting
+ public AtomicLong getDeletedKeyCount() {
+ return deletedKeyCount;
}
@Override
@@ -76,11 +102,11 @@ public class KeyDeletingService extends BackgroundService {
}
/**
- * A key deleting task scans OM DB and looking for a certain number
- * of pending-deletion keys, sends these keys along with their associated
- * blocks to SCM for deletion. Once SCM confirms keys are deleted (once
- * SCM persisted the blocks info in its deletedBlockLog), it removes
- * these keys from the DB.
+ * A key deleting task scans OM DB and looking for a certain number of
+ * pending-deletion keys, sends these keys along with their associated blocks
+ * to SCM for deletion. Once SCM confirms keys are deleted (once SCM persisted
+ * the blocks info in its deletedBlockLog), it removes these keys from the
+ * DB.
*/
private class KeyDeletingTask implements
BackgroundTask<BackgroundTaskResult> {
@@ -92,51 +118,55 @@ public class KeyDeletingService extends BackgroundService {
@Override
public BackgroundTaskResult call() throws Exception {
+ runCount.incrementAndGet();
try {
long startTime = Time.monotonicNow();
List<BlockGroup> keyBlocksList = manager
.getPendingDeletionKeys(keyLimitPerTask);
- if (keyBlocksList.size() > 0) {
- LOG.info("Found {} to-delete keys in OM", keyBlocksList.size());
+ if (keyBlocksList != null && keyBlocksList.size() > 0) {
List<DeleteBlockGroupResult> results =
scmClient.deleteKeyBlocks(keyBlocksList);
- for (DeleteBlockGroupResult result : results) {
- if (result.isSuccess()) {
- try {
- // Purge key from OM DB.
- manager.deletePendingDeletionKey(result.getObjectKey());
- LOG.debug("Key {} deleted from OM DB", result.getObjectKey());
- } catch (IOException e) {
- // if a pending deletion key is failed to delete,
- // print a warning here and retain it in this state,
- // so that it can be attempt to delete next time.
- LOG.warn("Failed to delete pending-deletion key {}",
- result.getObjectKey(), e);
- }
- } else {
- // Key deletion failed, retry in next interval.
- LOG.warn("Key {} deletion failed because some of the blocks"
- + " were failed to delete, failed blocks: {}",
- result.getObjectKey(),
- StringUtils.join(",", result.getFailedBlocks()));
- }
+ if (results != null) {
+ int delCount = deleteAllKeys(results);
+ LOG.debug("Number of keys deleted: {}, elapsed time: {}ms",
+ delCount, Time.monotonicNow() - startTime);
+ deletedKeyCount.addAndGet(delCount);
}
-
- if (!results.isEmpty()) {
- LOG.info("Number of key deleted from OM DB: {},"
- + " task elapsed time: {}ms",
- results.size(), Time.monotonicNow() - startTime);
- }
-
- return results::size;
- } else {
- LOG.debug("No pending deletion key found in OM");
}
} catch (IOException e) {
- LOG.error("Unable to get pending deletion keys, retry in"
- + " next interval", e);
+ LOG.error("Error while running delete keys background task. Will " +
+ "retry at next run.", e);
}
+ // By desing, no one cares about the results of this call back.
return EmptyTaskResult.newResult();
}
+
+ /**
+ * Deletes all the keys that SCM has acknowledged and queued for delete.
+ *
+ * @param results DeleteBlockGroups returned by SCM.
+ * @throws RocksDBException on Error.
+ * @throws IOException on Error
+ */
+ private int deleteAllKeys(List<DeleteBlockGroupResult> results)
+ throws RocksDBException, IOException {
+ Table deletedTable = manager.getMetadataManager().getDeletedTable();
+ // Put all keys to delete in a single transaction and call for delete.
+ int deletedCount = 0;
+ try (WriteBatch writeBatch = new WriteBatch()) {
+ for (DeleteBlockGroupResult result : results) {
+ if (result.isSuccess()) {
+ // Purge key from OM DB.
+ writeBatch.delete(deletedTable.getHandle(),
+ DFSUtil.string2Bytes(result.getObjectKey()));
+ LOG.debug("Key {} deleted from OM DB", result.getObjectKey());
+ deletedCount++;
+ }
+ }
+ // Write a single transaction for delete.
+ manager.getMetadataManager().getStore().write(writeBatch);
+ }
+ return deletedCount;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0d61e3/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index a512d7b..83363e7 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.utils.BackgroundService;
import java.io.IOException;
import java.util.List;
@@ -144,16 +145,6 @@ public interface KeyManager {
List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;
/**
- * Deletes a pending deletion key by its name. This is often called when
- * key can be safely deleted from this layer. Once called, all footprints
- * of the key will be purged from OM DB.
- *
- * @param objectKeyName object key name with #deleting# prefix.
- * @throws IOException if specified key doesn't exist or other I/O errors.
- */
- void deletePendingDeletionKey(String objectKeyName) throws IOException;
-
- /**
* Returns a list of all still open key info. Which contains the info about
* the key name and all its associated block IDs. A pending open key has
* prefix #open# in OM DB.
@@ -172,4 +163,17 @@ public interface KeyManager {
* @throws IOException if specified key doesn't exist or other I/O errors.
*/
void deleteExpiredOpenKey(String objectKeyName) throws IOException;
+
+ /**
+ * Returns the metadataManager.
+ * @return OMMetadataManager.
+ */
+ OMMetadataManager getMetadataManager();
+
+ /**
+ * Returns the instance of Deleting Service.
+ * @return Background service.
+ */
+ BackgroundService getDeletingService();
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0d61e3/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index d585523..06d2587 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.BackgroundService;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
@@ -43,9 +44,14 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
@@ -69,6 +75,8 @@ public class KeyManagerImpl implements KeyManager {
private final long preallocateMax;
private final String omId;
+ private final BackgroundService keyDeletingService;
+
public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
OMMetadataManager metadataManager,
OzoneConfiguration conf,
@@ -82,15 +90,28 @@ public class KeyManagerImpl implements KeyManager {
this.preallocateMax = conf.getLong(
OZONE_KEY_PREALLOCATION_MAXSIZE,
OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT);
+ long blockDeleteInterval = conf.getTimeDuration(
+ OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
+ OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ long serviceTimeout = conf.getTimeDuration(
+ OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
+ OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ keyDeletingService = new KeyDeletingService(
+ scmBlockClient, this, blockDeleteInterval, serviceTimeout, conf);
+
this.omId = omId;
}
@Override
public void start() {
+ keyDeletingService.start();
}
@Override
public void stop() throws IOException {
+ keyDeletingService.shutdown();
}
private void validateBucket(String volumeName, String bucketName)
@@ -460,14 +481,7 @@ public class KeyManagerImpl implements KeyManager {
@Override
public List<BlockGroup> getPendingDeletionKeys(final int count)
throws IOException {
- //TODO: Fix this in later patches.
- return null;
- }
-
- @Override
- public void deletePendingDeletionKey(String objectKeyName)
- throws IOException {
- // TODO : Fix in later patches.
+ return metadataManager.getPendingDeletionKeys(count);
}
@Override
@@ -485,4 +499,14 @@ public class KeyManagerImpl implements KeyManager {
Preconditions.checkNotNull(objectKeyName);
// TODO: Fix this in later patches.
}
+
+ @Override
+ public OMMetadataManager getMetadataManager() {
+ return metadataManager;
+ }
+
+ @Override
+ public BackgroundService getDeletingService() {
+ return keyDeletingService;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0d61e3/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 151fddf..16625dc 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -273,7 +273,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
@Override
public byte[] getOpenKeyBytes(String volume, String bucket,
- String key, long id) {
+ String key, long id) {
String openKey = OM_KEY_PREFIX + volume + OM_KEY_PREFIX + bucket +
OM_KEY_PREFIX + key + OM_KEY_PREFIX + id;
return DFSUtil.string2Bytes(openKey);
@@ -573,27 +573,37 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
}
@Override
- public List<BlockGroup> getPendingDeletionKeys(final int count)
+ public List<BlockGroup> getPendingDeletionKeys(final int keyCount)
throws IOException {
List<BlockGroup> keyBlocksList = Lists.newArrayList();
- // TODO: Fix this later, Not part of this patch.
- List<Map.Entry<byte[], byte[]>> rangeResult = Collections.emptyList();
- for (Map.Entry<byte[], byte[]> entry : rangeResult) {
- OmKeyInfo info =
- OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue()));
- // Get block keys as a list.
- OmKeyLocationInfoGroup latest = info.getLatestVersionLocations();
- if (latest == null) {
- return Collections.emptyList();
+ try (TableIterator<Table.KeyValue> keyIter = getDeletedTable().iterator()) {
+ int currentCount = 0;
+ while (keyIter.hasNext() && currentCount < keyCount) {
+ Table.KeyValue kv = keyIter.next();
+ if (kv != null) {
+ OmKeyInfo info =
+ OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(kv.getValue()));
+ // Get block keys as a list.
+ OmKeyLocationInfoGroup latest = info.getLatestVersionLocations();
+ if (latest == null) {
+ // This means that we have a key without any blocks.
+ // BUG-BUG: if this happens the key will never be deleted.
+ // TODO: Right thing to do is to remove this key right here.
+ LOG.warn("Found a key without blocks: {}, skipping for now.",
+ DFSUtil.bytes2String(kv.getKey()));
+ continue;
+ }
+ List<BlockID> item = latest.getLocationList().stream()
+ .map(b -> new BlockID(b.getContainerID(), b.getLocalID()))
+ .collect(Collectors.toList());
+ BlockGroup keyBlocks = BlockGroup.newBuilder()
+ .setKeyName(DFSUtil.bytes2String(kv.getKey()))
+ .addAllBlockIDs(item)
+ .build();
+ keyBlocksList.add(keyBlocks);
+ currentCount++;
+ }
}
- List<BlockID> item = latest.getLocationList().stream()
- .map(b -> new BlockID(b.getContainerID(), b.getLocalID()))
- .collect(Collectors.toList());
- BlockGroup keyBlocks = BlockGroup.newBuilder()
- .setKeyName(DFSUtil.bytes2String(entry.getKey()))
- .addAllBlockIDs(item)
- .build();
- keyBlocksList.add(keyBlocks);
}
return keyBlocksList;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0d61e3/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java
new file mode 100644
index 0000000..2da60de
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.hadoop.hdds.protocol.proto
+ .ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result;
+import static org.apache.hadoop.hdds.protocol.proto
+ .ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result.success;
+import static org.apache.hadoop.hdds.protocol.proto
+ .ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result.unknownFailure;
+
+/**
+ * This is a testing client that allows us to intercept calls from OzoneManager
+ * to SCM.
+ * <p>
+ * TODO: OzoneManager#getScmBlockClient -- so that we can load this class up via
+ * config setting into OzoneManager. Right now, we just pass this to
+ * KeyDeletingService only.
+ * <p>
+ * TODO: Move this class to a generic test utils so we can use this class in
+ * other Ozone Manager tests.
+ */
+public class ScmBlockLocationTestIngClient implements ScmBlockLocationProtocol {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ScmBlockLocationTestIngClient.class);
+ private final String clusterID;
+ private final String scmId;
+
+ // 0 means no calls will fail, +1 means all calls will fail, +2 means every
+ // second call will fail, +3 means every third and so on.
+ private final int failCallsFrequency;
+ private int currentCall = 0;
+
+ /**
+ * If ClusterID or SCMID is blank a per instance ID is generated.
+ *
+ * @param clusterID - String or blank.
+ * @param scmId - String or Blank.
+ * @param failCallsFrequency - Set to 0 for no failures, 1 for always to fail,
+ * a positive number for that frequency of failure.
+ */
+ public ScmBlockLocationTestIngClient(String clusterID, String scmId,
+ int failCallsFrequency) {
+ this.clusterID = StringUtils.isNotBlank(clusterID) ? clusterID :
+ UUID.randomUUID().toString();
+ this.scmId = StringUtils.isNotBlank(scmId) ? scmId :
+ UUID.randomUUID().toString();
+ this.failCallsFrequency = Math.abs(failCallsFrequency);
+ switch (this.failCallsFrequency) {
+ case 0:
+ LOG.debug("Set to no failure mode, all delete block calls will " +
+ "succeed.");
+ break;
+ case 1:
+ LOG.debug("Set to all failure mode. All delete block calls to SCM" +
+ " will fail.");
+ break;
+ default:
+ LOG.debug("Set to Mix mode, every {} -th call will fail",
+ this.failCallsFrequency);
+ }
+
+ }
+
+ /**
+ * Returns Fake blocks to the KeyManager so we get blocks in the Database.
+ * @param size - size of the block.
+ * @param type Replication Type
+ * @param factor - Replication factor
+ * @param owner - String owner.
+ * @return
+ * @throws IOException
+ */
+ @Override
+ public AllocatedBlock allocateBlock(long size,
+ HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
+ String owner) throws IOException {
+ DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
+ Pipeline pipeline = createPipeline(datanodeDetails);
+ long containerID = Time.monotonicNow();
+ long localID = Time.monotonicNow();
+ AllocatedBlock.Builder abb =
+ new AllocatedBlock.Builder()
+ .setBlockID(new BlockID(containerID, localID))
+ .setPipeline(pipeline)
+ .setShouldCreateContainer(false);
+ return abb.build();
+ }
+
+ private Pipeline createPipeline(DatanodeDetails datanode) {
+ final Pipeline pipeline =
+ new Pipeline(datanode.getUuidString(), HddsProtos.LifeCycleState.OPEN,
+ HddsProtos.ReplicationType.STAND_ALONE,
+ HddsProtos.ReplicationFactor.ONE,
+ PipelineID.randomId());
+ pipeline.addMember(datanode);
+ return pipeline;
+ }
+
+ @Override
+ public List<DeleteBlockGroupResult> deleteKeyBlocks(
+ List<BlockGroup> keyBlocksInfoList) throws IOException {
+ List<DeleteBlockGroupResult> results = new ArrayList<>();
+ List<DeleteBlockResult> blockResultList = new ArrayList<>();
+ Result result;
+ for (BlockGroup keyBlocks : keyBlocksInfoList) {
+ for (BlockID blockKey : keyBlocks.getBlockIDList()) {
+ currentCall++;
+ switch (this.failCallsFrequency) {
+ case 0:
+ result = success;
+ break;
+ case 1:
+ result = unknownFailure;
+ break;
+ default:
+ if (currentCall % this.failCallsFrequency == 0) {
+ result = unknownFailure;
+ } else {
+ result = success;
+ }
+ }
+ blockResultList.add(new DeleteBlockResult(blockKey, result));
+ }
+ results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(),
+ blockResultList));
+ }
+ return results;
+ }
+
+ @Override
+ public ScmInfo getScmInfo() throws IOException {
+ ScmInfo.Builder builder =
+ new ScmInfo.Builder()
+ .setClusterId(clusterID)
+ .setScmId(scmId);
+ return builder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df0d61e3/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
new file mode 100644
index 0000000..44e3bdf
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyDeletingService.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.utils.db.DBConfigFromFile;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys
+ .HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+ .OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
+
+/**
+ * Test Key Deleting Service.
+ * <p>
+ * This test does the following things.
+ * <p>
+ * 1. Creates a bunch of keys. 2. Then executes delete key directly using
+ * Metadata Manager. 3. Waits for a while for the KeyDeleting Service to pick up
+ * and call into SCM. 4. Confirms that calls have been successful.
+ */
+public class TestKeyDeletingService {
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ private OzoneConfiguration createConfAndInitValues() throws IOException {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ File newFolder = folder.newFolder();
+ if (!newFolder.exists()) {
+ Assert.assertTrue(newFolder.mkdirs());
+ }
+ System.setProperty(DBConfigFromFile.CONFIG_DIR, "/");
+ ServerUtils.setOzoneMetaDirPath(conf, newFolder.toString());
+ conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 100,
+ TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
+ TimeUnit.MILLISECONDS);
+ conf.setQuietMode(false);
+
+ return conf;
+ }
+
+ /**
+ * In this test, we create a bunch of keys and delete them. Then we start the
+ * KeyDeletingService and pass a SCMClient which does not fail. We make sure
+ * that all the keys that we deleted is picked up and deleted by
+ * OzoneManager.
+ *
+ * @throws IOException - on Failure.
+ */
+
+ @Test(timeout = 30000)
+ public void checkIfDeleteServiceisDeletingKeys()
+ throws IOException, TimeoutException, InterruptedException {
+ OzoneConfiguration conf = createConfAndInitValues();
+ OmMetadataManagerImpl metaMgr = new OmMetadataManagerImpl(conf);
+ KeyManager keyManager =
+ new KeyManagerImpl(
+ new ScmBlockLocationTestIngClient(null, null, 0),
+ metaMgr, conf, UUID.randomUUID().toString());
+ final int keyCount = 100;
+ createAndDeleteKeys(keyManager, keyCount);
+ KeyDeletingService keyDeletingService =
+ (KeyDeletingService) keyManager.getDeletingService();
+ keyManager.start();
+ GenericTestUtils.waitFor(
+ () -> keyDeletingService.getDeletedKeyCount().get() >= keyCount,
+ 1000, 10000);
+ Assert.assertTrue(keyDeletingService.getRunCount().get() > 1);
+ }
+
+ @Test(timeout = 30000)
+ public void checkIfDeleteServiceWithFailingSCM()
+ throws IOException, TimeoutException, InterruptedException {
+ OzoneConfiguration conf = createConfAndInitValues();
+ OmMetadataManagerImpl metaMgr = new OmMetadataManagerImpl(conf);
+ //failCallsFrequency = 1 , means all calls fail.
+ KeyManager keyManager =
+ new KeyManagerImpl(
+ new ScmBlockLocationTestIngClient(null, null, 1),
+ metaMgr, conf, UUID.randomUUID().toString());
+ final int keyCount = 100;
+ createAndDeleteKeys(keyManager, keyCount);
+ KeyDeletingService keyDeletingService =
+ (KeyDeletingService) keyManager.getDeletingService();
+ keyManager.start();
+ // Make sure that we have run the background thread 5 times more
+ GenericTestUtils.waitFor(
+ () -> keyDeletingService.getRunCount().get() >= 5,
+ 100, 1000);
+ // Since SCM calls are failing, deletedKeyCount should be zero.
+ Assert.assertEquals(keyDeletingService.getDeletedKeyCount().get(), 0);
+
+
+ }
+
+ private void createAndDeleteKeys(KeyManager keyManager, int keyCount)
+ throws IOException {
+ for (int x = 0; x < keyCount; x++) {
+ String volumeName = String.format("volume%s",
+ RandomStringUtils.randomAlphanumeric(5));
+ String bucketName = String.format("bucket%s",
+ RandomStringUtils.randomAlphanumeric(5));
+ String keyName = String.format("key%s",
+ RandomStringUtils.randomAlphanumeric(5));
+ byte[] volumeBytes =
+ keyManager.getMetadataManager().getVolumeKey(volumeName);
+ byte[] bucketBytes =
+ keyManager.getMetadataManager().getBucketKey(volumeName, bucketName);
+ // cheat here, just create a volume and bucket entry so that we can
+ // create the keys, we put the same data for key and value since the
+ // system does not decode the object
+ keyManager.getMetadataManager().getVolumeTable().put(volumeBytes,
+ volumeBytes);
+
+ keyManager.getMetadataManager().getBucketTable().put(bucketBytes,
+ bucketBytes);
+
+ OmKeyArgs arg =
+ new OmKeyArgs.Builder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
+ .setKeyName(keyName)
+ .build();
+ //Open, Commit and Delete the Keys in the Key Manager.
+ OpenKeySession session = keyManager.openKey(arg);
+ arg.addLocationInfo(keyManager.allocateBlock(arg, session.getId()));
+ keyManager.commitKey(arg, session.getId());
+ keyManager.deleteKey(arg);
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org