You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by bh...@apache.org on 2020/11/25 15:42:03 UTC

[ozone] branch master updated: HDDS-4481. With HA OM can send deletion blocks to SCM multiple times. (#1608)

This is an automated email from the ASF dual-hosted git repository.

bharat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new d83ec1a  HDDS-4481. With HA OM can send deletion blocks to SCM multiple times. (#1608)
d83ec1a is described below

commit d83ec1af165677d7c063ff574d32ae2abdd4ff5d
Author: Bharat Viswanadham <bh...@apache.org>
AuthorDate: Wed Nov 25 07:41:49 2020 -0800

    HDDS-4481. With HA OM can send deletion blocks to SCM multiple times. (#1608)
---
 .../apache/hadoop/ozone/om/TestOzoneManagerHA.java | 47 +++++++++++++
 .../ozone/om/TestOzoneManagerHAKeyDeletion.java    | 77 ++++++++++++++++++++++
 .../ozone/om/TestOzoneManagerHAWithData.java       | 37 -----------
 .../apache/hadoop/ozone/om/KeyDeletingService.java | 19 +++++-
 .../ozone/om/ratis/OzoneManagerRatisServer.java    | 35 ++++++++--
 5 files changed, 172 insertions(+), 43 deletions(-)

diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index daca5c3..4a2ccbb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -26,10 +26,12 @@ import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.VolumeArgs;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.rpc.RpcClient;
 import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
@@ -51,9 +53,11 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONN
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_RETRY_INTERVAL_KEY;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
 
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
 import static org.junit.Assert.fail;
 
 /**
@@ -139,6 +143,12 @@ public abstract class TestOzoneManagerHA {
     conf.setLong(
         OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
         SNAPSHOT_THRESHOLD);
+
+    /**
+     * config for key deleting service.
+     */
+    conf.set(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, "10s");
+    conf.set(OZONE_KEY_DELETING_LIMIT_PER_TASK, "2");
     cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
         .setClusterId(clusterId)
         .setScmId(scmId)
@@ -261,4 +271,41 @@ public abstract class TestOzoneManagerHA {
       }
     }
   }
+
+  /**
+   * This method createFile and verifies the file is successfully created or
+   * not.
+   * @param ozoneBucket
+   * @param keyName
+   * @param data
+   * @param recursive
+   * @param overwrite
+   * @throws Exception
+   */
+  protected void testCreateFile(OzoneBucket ozoneBucket, String keyName,
+      String data, boolean recursive, boolean overwrite)
+      throws Exception {
+
+    OzoneOutputStream ozoneOutputStream = ozoneBucket.createFile(keyName,
+        data.length(), ReplicationType.RATIS, ReplicationFactor.ONE,
+        overwrite, recursive);
+
+    ozoneOutputStream.write(data.getBytes(), 0, data.length());
+    ozoneOutputStream.close();
+
+    OzoneKeyDetails ozoneKeyDetails = ozoneBucket.getKey(keyName);
+
+    Assert.assertEquals(keyName, ozoneKeyDetails.getName());
+    Assert.assertEquals(ozoneBucket.getName(), ozoneKeyDetails.getBucketName());
+    Assert.assertEquals(ozoneBucket.getVolumeName(),
+        ozoneKeyDetails.getVolumeName());
+    Assert.assertEquals(data.length(), ozoneKeyDetails.getDataSize());
+
+    OzoneInputStream ozoneInputStream = ozoneBucket.readKey(keyName);
+
+    byte[] fileContent = new byte[data.getBytes().length];
+    ozoneInputStream.read(fileContent);
+    Assert.assertEquals(data, new String(fileContent));
+  }
+
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAKeyDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAKeyDeletion.java
new file mode 100644
index 0000000..52449a2
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAKeyDeletion.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.fail;
+
+public class TestOzoneManagerHAKeyDeletion extends TestOzoneManagerHA {
+
+  @Test
+  public void testKeyDeletion() throws Exception {
+    OzoneBucket ozoneBucket = setupBucket();
+    String data = "random data";
+    String keyName1 = "dir/file1";
+    String keyName2 = "dir/file2";
+    String keyName3 = "dir/file3";
+    String keyName4 = "dir/file4";
+    List<String> keyList1 = new ArrayList<>();
+    keyList1.add(keyName2);
+    keyList1.add(keyName3);
+
+    testCreateFile(ozoneBucket, keyName1, data, true, false);
+    testCreateFile(ozoneBucket, keyName2, data, true, false);
+    testCreateFile(ozoneBucket, keyName3, data, true, false);
+    testCreateFile(ozoneBucket, keyName4, data, true, false);
+
+    ozoneBucket.deleteKey(keyName1);
+    ozoneBucket.deleteKey(keyName2);
+    ozoneBucket.deleteKey(keyName3);
+    ozoneBucket.deleteKey(keyName4);
+
+    // Now check delete table has entries been removed.
+
+    OzoneManager ozoneManager = getCluster().getOMLeader();
+
+    KeyDeletingService keyDeletingService =
+        (KeyDeletingService) ozoneManager.getKeyManager().getDeletingService();
+
+    // Check on leader OM Count.
+    GenericTestUtils.waitFor(() ->
+        keyDeletingService.getRunCount().get() >= 2, 10000, 120000);
+    GenericTestUtils.waitFor(() ->
+            keyDeletingService.getDeletedKeyCount().get() == 4, 10000, 120000);
+
+    // Check delete table is empty or not on all OMs.
+    getCluster().getOzoneManagersList().forEach((om) -> {
+      try {
+        GenericTestUtils.waitFor(() ->
+                !om.getMetadataManager().getDeletedTable().iterator().hasNext(),
+            10000, 120000);
+      } catch (Exception ex) {
+        fail("TestOzoneManagerHAKeyDeletion failed");
+      }
+    });
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java
index aed84f5..3a3d82b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneBucket;
-import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.VolumeArgs;
@@ -251,42 +250,6 @@ public class TestOzoneManagerHAWithData extends TestOzoneManagerHA {
 
   }
 
-  /**
-   * This method createFile and verifies the file is successfully created or
-   * not.
-   * @param ozoneBucket
-   * @param keyName
-   * @param data
-   * @param recursive
-   * @param overwrite
-   * @throws Exception
-   */
-  public void testCreateFile(OzoneBucket ozoneBucket, String keyName,
-      String data, boolean recursive, boolean overwrite)
-      throws Exception {
-
-    OzoneOutputStream ozoneOutputStream = ozoneBucket.createFile(keyName,
-        data.length(), ReplicationType.RATIS, ReplicationFactor.ONE,
-        overwrite, recursive);
-
-    ozoneOutputStream.write(data.getBytes(), 0, data.length());
-    ozoneOutputStream.close();
-
-    OzoneKeyDetails ozoneKeyDetails = ozoneBucket.getKey(keyName);
-
-    Assert.assertEquals(keyName, ozoneKeyDetails.getName());
-    Assert.assertEquals(ozoneBucket.getName(), ozoneKeyDetails.getBucketName());
-    Assert.assertEquals(ozoneBucket.getVolumeName(),
-        ozoneKeyDetails.getVolumeName());
-    Assert.assertEquals(data.length(), ozoneKeyDetails.getDataSize());
-
-    OzoneInputStream ozoneInputStream = ozoneBucket.readKey(keyName);
-
-    byte[] fileContent = new byte[data.getBytes().length];
-    ozoneInputStream.read(fileContent);
-    Assert.assertEquals(data, new String(fileContent));
-  }
-
   @Test
   public void testMultipartUploadWithOneOmNodeDown() throws Exception {
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
index 345a446..466a55f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyDeletingService.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeletedKeys;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeKeysRequest;
@@ -51,6 +52,8 @@ import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.util.Preconditions;
 import org.rocksdb.RocksDBException;
 import org.slf4j.Logger;
@@ -74,7 +77,7 @@ public class KeyDeletingService extends BackgroundService {
   private final OzoneManager ozoneManager;
   private final ScmBlockLocationProtocol scmClient;
   private final KeyManager manager;
-  private ClientId clientId = ClientId.randomId();
+  private static ClientId clientId = ClientId.randomId();
   private final int keyLimitPerTask;
   private final AtomicLong deletedKeyCount;
   private final AtomicLong runCount;
@@ -264,7 +267,10 @@ public class KeyDeletingService extends BackgroundService {
 
       // Submit PurgeKeys request to OM
       try {
-        ozoneManager.getOmServerProtocol().submitRequest(null, omRequest);
+        RaftClientRequest raftClientRequest =
+            createRaftClientRequestForPurge(omRequest);
+        ozoneManager.getOmRatisServer().submitRequest(omRequest,
+            raftClientRequest);
       } catch (ServiceException e) {
         LOG.error("PurgeKey request failed. Will retry at next run.");
         return 0;
@@ -274,6 +280,15 @@ public class KeyDeletingService extends BackgroundService {
     }
   }
 
+  private RaftClientRequest createRaftClientRequestForPurge(
+      OMRequest omRequest) {
+    return new RaftClientRequest(clientId,
+        ozoneManager.getOmRatisServer().getRaftPeerId(),
+        ozoneManager.getOmRatisServer().getRaftGroupId(), runCount.get(),
+        Message.valueOf(OMRatisHelper.convertRequestToByteString(omRequest)),
+        RaftClientRequest.writeRequestType(), null);
+  }
+
   /**
    * Parse Volume and Bucket Name from ObjectKey and add it to given map of
    * keys to be purged per bucket.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index 0bf58ba..467764b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.server.ServerUtils;
@@ -86,6 +87,9 @@ import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.ipc.RpcConstants.DUMMY_CLIENT_ID;
+import static org.apache.hadoop.ipc.RpcConstants.INVALID_CALL_ID;
+
 /**
  * Creates a Ratis server endpoint for OM.
  */
@@ -126,15 +130,32 @@ public final class OzoneManagerRatisServer {
   public OMResponse submitRequest(OMRequest omRequest) throws ServiceException {
     RaftClientRequest raftClientRequest =
         createWriteRaftClientRequest(omRequest);
-    RaftClientReply raftClientReply;
+    RaftClientReply raftClientReply = submitRequestToRatis(raftClientRequest);
+    return processReply(omRequest, raftClientReply);
+  }
+
+  /**
+   * API used internally from OzoneManager Server when requests needs to be
+   * submitted to ratis, where the crafted RaftClientRequest is passed along.
+   * @param omRequest
+   * @param raftClientRequest
+   * @return OMResponse
+   * @throws ServiceException
+   */
+  public OMResponse submitRequest(OMRequest omRequest,
+      RaftClientRequest raftClientRequest) throws ServiceException {
+    RaftClientReply raftClientReply = submitRequestToRatis(raftClientRequest);
+    return processReply(omRequest, raftClientReply);
+  }
+
+  private RaftClientReply submitRequestToRatis(
+      RaftClientRequest raftClientRequest) throws ServiceException {
     try {
-      raftClientReply = server.submitClientRequestAsync(raftClientRequest)
+      return server.submitClientRequestAsync(raftClientRequest)
           .get();
     } catch (Exception ex) {
       throw new ServiceException(ex.getMessage(), ex);
     }
-
-    return processReply(omRequest, raftClientReply);
   }
 
   /**
@@ -144,6 +165,8 @@ public final class OzoneManagerRatisServer {
    * ratis server.
    */
   private RaftClientRequest createWriteRaftClientRequest(OMRequest omRequest) {
+    Preconditions.checkArgument(Server.getClientId() != DUMMY_CLIENT_ID);
+    Preconditions.checkArgument(Server.getCallId() != INVALID_CALL_ID);
     return new RaftClientRequest(
         ClientId.valueOf(UUID.nameUUIDFromBytes(Server.getClientId())),
         server.getId(), raftGroupId, Server.getCallId(),
@@ -714,4 +737,8 @@ public final class OzoneManagerRatisServer {
   public TermIndex getLastAppliedTermIndex() {
     return omStateMachine.getLastAppliedTermIndex();
   }
+
+  public RaftGroupId getRaftGroupId() {
+    return raftGroupId;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org