You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by bh...@apache.org on 2020/11/25 15:42:03 UTC
[ozone] branch master updated: HDDS-4481. With HA OM can send
deletion blocks to SCM multiple times. (#1608)
This is an automated email from the ASF dual-hosted git repository.
bharat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new d83ec1a HDDS-4481. With HA OM can send deletion blocks to SCM multiple times. (#1608)
d83ec1a is described below
commit d83ec1af165677d7c063ff574d32ae2abdd4ff5d
Author: Bharat Viswanadham <bh...@apache.org>
AuthorDate: Wed Nov 25 07:41:49 2020 -0800
HDDS-4481. With HA OM can send deletion blocks to SCM multiple times. (#1608)
---
.../apache/hadoop/ozone/om/TestOzoneManagerHA.java | 47 +++++++++++++
.../ozone/om/TestOzoneManagerHAKeyDeletion.java | 77 ++++++++++++++++++++++
.../ozone/om/TestOzoneManagerHAWithData.java | 37 -----------
.../apache/hadoop/ozone/om/KeyDeletingService.java | 19 +++++-
.../ozone/om/ratis/OzoneManagerRatisServer.java | 35 ++++++++--
5 files changed, 172 insertions(+), 43 deletions(-)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index daca5c3..4a2ccbb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -26,10 +26,12 @@ import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.VolumeArgs;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.rpc.RpcClient;
import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
@@ -51,9 +53,11 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONN
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
import static org.junit.Assert.fail;
/**
@@ -139,6 +143,12 @@ public abstract class TestOzoneManagerHA {
conf.setLong(
OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
SNAPSHOT_THRESHOLD);
+
+ /**
+ * config for key deleting service.
+ */
+ conf.set(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, "10s");
+ conf.set(OZONE_KEY_DELETING_LIMIT_PER_TASK, "2");
cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
.setClusterId(clusterId)
.setScmId(scmId)
@@ -261,4 +271,41 @@ public abstract class TestOzoneManagerHA {
}
}
}
+
+ /**
+ * This method createFile and verifies the file is successfully created or
+ * not.
+ * @param ozoneBucket
+ * @param keyName
+ * @param data
+ * @param recursive
+ * @param overwrite
+ * @throws Exception
+ */
+ protected void testCreateFile(OzoneBucket ozoneBucket, String keyName,
+ String data, boolean recursive, boolean overwrite)
+ throws Exception {
+
+ OzoneOutputStream ozoneOutputStream = ozoneBucket.createFile(keyName,
+ data.length(), ReplicationType.RATIS, ReplicationFactor.ONE,
+ overwrite, recursive);
+
+ ozoneOutputStream.write(data.getBytes(), 0, data.length());
+ ozoneOutputStream.close();
+
+ OzoneKeyDetails ozoneKeyDetails = ozoneBucket.getKey(keyName);
+
+ Assert.assertEquals(keyName, ozoneKeyDetails.getName());
+ Assert.assertEquals(ozoneBucket.getName(), ozoneKeyDetails.getBucketName());
+ Assert.assertEquals(ozoneBucket.getVolumeName(),
+ ozoneKeyDetails.getVolumeName());
+ Assert.assertEquals(data.length(), ozoneKeyDetails.getDataSize());
+
+ OzoneInputStream ozoneInputStream = ozoneBucket.readKey(keyName);
+
+ byte[] fileContent = new byte[data.getBytes().length];
+ ozoneInputStream.read(fileContent);
+ Assert.assertEquals(data, new String(fileContent));
+ }
+
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAKeyDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAKeyDeletion.java
new file mode 100644
index 0000000..52449a2
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAKeyDeletion.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.fail;
+
+public class TestOzoneManagerHAKeyDeletion extends TestOzoneManagerHA {
+
+ @Test
+ public void testKeyDeletion() throws Exception {
+ OzoneBucket ozoneBucket = setupBucket();
+ String data = "random data";
+ String keyName1 = "dir/file1";
+ String keyName2 = "dir/file2";
+ String keyName3 = "dir/file3";
+ String keyName4 = "dir/file4";
+ List<String> keyList1 = new ArrayList<>();
+ keyList1.add(keyName2);
+ keyList1.add(keyName3);
+
+ testCreateFile(ozoneBucket, keyName1, data, true, false);
+ testCreateFile(ozoneBucket, keyName2, data, true, false);
+ testCreateFile(ozoneBucket, keyName3, data, true, false);
+ testCreateFile(ozoneBucket, keyName4, data, true, false);
+
+ ozoneBucket.deleteKey(keyName1);
+ ozoneBucket.deleteKey(keyName2);
+ ozoneBucket.deleteKey(keyName3);
+ ozoneBucket.deleteKey(keyName4);
+
+ // Now check delete table has entries been removed.
+
+ OzoneManager ozoneManager = getCluster().getOMLeader();
+
+ KeyDeletingService keyDeletingService =
+ (KeyDeletingService) ozoneManager.getKeyManager().getDeletingService();
+
+ // Check on leader OM Count.
+ GenericTestUtils.waitFor(() ->
+ keyDeletingService.getRunCount().get() >= 2, 10000, 120000);
+ GenericTestUtils.waitFor(() ->
+ keyDeletingService.getDeletedKeyCount().get() == 4, 10000, 120000);
+
+ // Check delete table is empty or not on all OMs.
+ getCluster().getOzoneManagersList().forEach((om) -> {
+ try {
+ GenericTestUtils.waitFor(() ->
+ !om.getMetadataManager().getDeletedTable().iterator().hasNext(),
+ 10000, 120000);
+ } catch (Exception ex) {
+ fail("TestOzoneManagerHAKeyDeletion failed");
+ }
+ });
+ }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java
index aed84f5..3a3d82b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneBucket;
-import org.apache.hadoop.ozone.client.OzoneKeyDetails;
import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.VolumeArgs;
@@ -251,42 +250,6 @@ public class TestOzoneManagerHAWithData extends TestOzoneManagerHA {
}
- /**
- * This method createFile and verifies the file is successfully created or
- * not.
- * @param ozoneBucket
- * @param keyName
- * @param data
- * @param recursive
- * @param overwrite
- * @throws Exception
- */
- public void testCreateFile(OzoneBucket ozoneBucket, String keyName,
- String data, boolean recursive, boolean overwrite)
- throws Exception {
-
- OzoneOutputStream ozoneOutputStream = ozoneBucket.createFile(keyName,
- data.length(), ReplicationType.RATIS, ReplicationFactor.ONE,
- overwrite, recursive);
-
- ozoneOutputStream.write(data.getBytes(), 0, data.length());
- ozoneOutputStream.close();
-
- OzoneKeyDetails ozoneKeyDetails = ozoneBucket.getKey(keyName);
-
- Assert.assertEquals(keyName, ozoneKeyDetails.getName());
- Assert.assertEquals(ozoneBucket.getName(), ozoneKeyDetails.getBucketName());
- Assert.assertEquals(ozoneBucket.getVolumeName(),
- ozoneKeyDetails.getVolumeName());
- Assert.assertEquals(data.length(), ozoneKeyDetails.getDataSize());
-
- OzoneInputStream ozoneInputStream = ozoneBucket.readKey(keyName);
-
- byte[] fileContent = new byte[data.getBytes().length];
- ozoneInputStream.read(fileContent);
- Assert.assertEquals(data, new String(fileContent));
- }
-
@Test
public void testMultipartUploadWithOneOmNodeDown() throws Exception {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
index 345a446..466a55f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest;
@@ -51,6 +52,8 @@ import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.util.Preconditions;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
@@ -74,7 +77,7 @@ public class KeyDeletingService extends BackgroundService {
private final OzoneManager ozoneManager;
private final ScmBlockLocationProtocol scmClient;
private final KeyManager manager;
- private ClientId clientId = ClientId.randomId();
+ private static ClientId clientId = ClientId.randomId();
private final int keyLimitPerTask;
private final AtomicLong deletedKeyCount;
private final AtomicLong runCount;
@@ -264,7 +267,10 @@ public class KeyDeletingService extends BackgroundService {
// Submit PurgeKeys request to OM
try {
- ozoneManager.getOmServerProtocol().submitRequest(null, omRequest);
+ RaftClientRequest raftClientRequest =
+ createRaftClientRequestForPurge(omRequest);
+ ozoneManager.getOmRatisServer().submitRequest(omRequest,
+ raftClientRequest);
} catch (ServiceException e) {
LOG.error("PurgeKey request failed. Will retry at next run.");
return 0;
@@ -274,6 +280,15 @@ public class KeyDeletingService extends BackgroundService {
}
}
+ private RaftClientRequest createRaftClientRequestForPurge(
+ OMRequest omRequest) {
+ return new RaftClientRequest(clientId,
+ ozoneManager.getOmRatisServer().getRaftPeerId(),
+ ozoneManager.getOmRatisServer().getRaftGroupId(), runCount.get(),
+ Message.valueOf(OMRatisHelper.convertRequestToByteString(omRequest)),
+ RaftClientRequest.writeRequestType(), null);
+ }
+
/**
* Parse Volume and Bucket Name from ObjectKey and add it to given map of
* keys to be purged per bucket.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index 0bf58ba..467764b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.server.ServerUtils;
@@ -86,6 +87,9 @@ import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.ipc.RpcConstants.DUMMY_CLIENT_ID;
+import static org.apache.hadoop.ipc.RpcConstants.INVALID_CALL_ID;
+
/**
* Creates a Ratis server endpoint for OM.
*/
@@ -126,15 +130,32 @@ public final class OzoneManagerRatisServer {
public OMResponse submitRequest(OMRequest omRequest) throws ServiceException {
RaftClientRequest raftClientRequest =
createWriteRaftClientRequest(omRequest);
- RaftClientReply raftClientReply;
+ RaftClientReply raftClientReply = submitRequestToRatis(raftClientRequest);
+ return processReply(omRequest, raftClientReply);
+ }
+
+ /**
+ * API used internally from OzoneManager Server when requests needs to be
+ * submitted to ratis, where the crafted RaftClientRequest is passed along.
+ * @param omRequest
+ * @param raftClientRequest
+ * @return OMResponse
+ * @throws ServiceException
+ */
+ public OMResponse submitRequest(OMRequest omRequest,
+ RaftClientRequest raftClientRequest) throws ServiceException {
+ RaftClientReply raftClientReply = submitRequestToRatis(raftClientRequest);
+ return processReply(omRequest, raftClientReply);
+ }
+
+ private RaftClientReply submitRequestToRatis(
+ RaftClientRequest raftClientRequest) throws ServiceException {
try {
- raftClientReply = server.submitClientRequestAsync(raftClientRequest)
+ return server.submitClientRequestAsync(raftClientRequest)
.get();
} catch (Exception ex) {
throw new ServiceException(ex.getMessage(), ex);
}
-
- return processReply(omRequest, raftClientReply);
}
/**
@@ -144,6 +165,8 @@ public final class OzoneManagerRatisServer {
* ratis server.
*/
private RaftClientRequest createWriteRaftClientRequest(OMRequest omRequest) {
+ Preconditions.checkArgument(Server.getClientId() != DUMMY_CLIENT_ID);
+ Preconditions.checkArgument(Server.getCallId() != INVALID_CALL_ID);
return new RaftClientRequest(
ClientId.valueOf(UUID.nameUUIDFromBytes(Server.getClientId())),
server.getId(), raftGroupId, Server.getCallId(),
@@ -714,4 +737,8 @@ public final class OzoneManagerRatisServer {
public TermIndex getLastAppliedTermIndex() {
return omStateMachine.getLastAppliedTermIndex();
}
+
+ public RaftGroupId getRaftGroupId() {
+ return raftGroupId;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org