You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by lj...@apache.org on 2019/03/11 09:25:33 UTC
[hadoop] branch trunk updated: HDDS-1220. KeyManager#openKey should
release the bucket lock before doing an allocateBlock. Contributed by
Lokesh Jain.
This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new ebb5fa1 HDDS-1220. KeyManager#openKey should release the bucket lock before doing an allocateBlock. Contributed by Lokesh Jain.
ebb5fa1 is described below
commit ebb5fa115b842a3bcb747e881ca2a8fe4c716d07
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Mon Mar 11 14:54:44 2019 +0530
HDDS-1220. KeyManager#openKey should release the bucket lock before doing an allocateBlock. Contributed by Lokesh Jain.
---
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 126 +++++------
.../apache/hadoop/ozone/om/TestKeyManagerImpl.java | 230 ++++++++++-----------
2 files changed, 161 insertions(+), 195 deletions(-)
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index 5caeaea..6500a5e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -240,37 +240,49 @@ public class KeyManagerImpl implements KeyManager {
OMException.ResultCodes.KEY_NOT_FOUND);
}
- AllocatedBlock allocatedBlock;
- try {
- allocatedBlock =
- scmBlockClient.allocateBlock(scmBlockSize, keyInfo.getType(),
- keyInfo.getFactor(), omId, excludeList);
- } catch (SCMException ex) {
- if (ex.getResult()
- .equals(SCMException.ResultCodes.CHILL_MODE_EXCEPTION)) {
- throw new OMException(ex.getMessage(), ResultCodes.SCM_IN_CHILL_MODE);
- }
- throw ex;
- }
- OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder()
- .setBlockID(new BlockID(allocatedBlock.getBlockID()))
- .setLength(scmBlockSize)
- .setOffset(0);
- if (grpcBlockTokenEnabled) {
- String remoteUser = getRemoteUser().getShortUserName();
- builder.setToken(secretManager.generateToken(remoteUser,
- allocatedBlock.getBlockID().toString(),
- getAclForUser(remoteUser),
- scmBlockSize));
- }
- OmKeyLocationInfo info = builder.build();
// current version not committed, so new blocks coming now are added to
// the same version
- keyInfo.appendNewBlocks(Collections.singletonList(info));
+ List<OmKeyLocationInfo> locationInfos =
+ allocateBlock(keyInfo, excludeList, scmBlockSize);
+ keyInfo.appendNewBlocks(locationInfos);
keyInfo.updateModifcationTime();
metadataManager.getOpenKeyTable().put(openKey,
keyInfo);
- return info;
+ return locationInfos.get(0);
+ }
+
+ private List<OmKeyLocationInfo> allocateBlock(OmKeyInfo keyInfo,
+ ExcludeList excludeList, long requestedSize) throws IOException {
+ int numBlocks = (int) ((requestedSize - 1) / scmBlockSize + 1);
+ List<OmKeyLocationInfo> locationInfos = new ArrayList<>(numBlocks);
+ while (requestedSize > 0) {
+ long allocateSize = Math.min(requestedSize, scmBlockSize);
+ AllocatedBlock allocatedBlock;
+ try {
+ allocatedBlock = scmBlockClient
+ .allocateBlock(allocateSize, keyInfo.getType(), keyInfo.getFactor(),
+ omId, excludeList);
+ } catch (SCMException ex) {
+ if (ex.getResult()
+ .equals(SCMException.ResultCodes.CHILL_MODE_EXCEPTION)) {
+ throw new OMException(ex.getMessage(), ResultCodes.SCM_IN_CHILL_MODE);
+ }
+ throw ex;
+ }
+ OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder()
+ .setBlockID(new BlockID(allocatedBlock.getBlockID()))
+ .setLength(scmBlockSize)
+ .setOffset(0);
+ if (grpcBlockTokenEnabled) {
+ String remoteUser = getRemoteUser().getShortUserName();
+ builder.setToken(secretManager
+ .generateToken(remoteUser, allocatedBlock.getBlockID().toString(),
+ getAclForUser(remoteUser), scmBlockSize));
+ }
+ locationInfos.add(builder.build());
+ requestedSize -= allocateSize;
+ }
+ return locationInfos;
}
/* Optimize ugi lookup for RPC operations to avoid a trip through
@@ -327,6 +339,10 @@ public class KeyManagerImpl implements KeyManager {
ReplicationFactor factor = args.getFactor();
ReplicationType type = args.getType();
long currentTime = Time.monotonicNowNanos();
+ long requestedSize = Math.min(preallocateMax, args.getDataSize());
+ OmKeyInfo keyInfo;
+ String openKey;
+ long openVersion;
FileEncryptionInfo encInfo = null;
OmBucketInfo bucketInfo = getBucketInfo(volumeName, bucketName);
@@ -378,55 +394,14 @@ public class KeyManagerImpl implements KeyManager {
type = useRatis ? ReplicationType.RATIS : ReplicationType.STAND_ALONE;
}
}
- long requestedSize = Math.min(preallocateMax, args.getDataSize());
List<OmKeyLocationInfo> locations = new ArrayList<>();
String objectKey = metadataManager.getOzoneKey(
volumeName, bucketName, keyName);
- // requested size is not required but more like a optimization:
- // SCM looks at the requested, if it 0, no block will be allocated at
- // the point, if client needs more blocks, client can always call
- // allocateBlock. But if requested size is not 0, OM will preallocate
- // some blocks and piggyback to client, to save RPC calls.
- while (requestedSize > 0) {
- long allocateSize = Math.min(scmBlockSize, requestedSize);
- AllocatedBlock allocatedBlock;
- try {
- allocatedBlock = scmBlockClient
- .allocateBlock(allocateSize, type, factor, omId,
- new ExcludeList());
- } catch (IOException ex) {
- if (ex instanceof SCMException) {
- if (((SCMException) ex).getResult()
- .equals(SCMException.ResultCodes.CHILL_MODE_EXCEPTION)) {
- throw new OMException(ex.getMessage(),
- ResultCodes.SCM_IN_CHILL_MODE);
- }
- }
- throw ex;
- }
- OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder()
- .setBlockID(new BlockID(allocatedBlock.getBlockID()))
- .setLength(allocateSize)
- .setOffset(0);
- if (grpcBlockTokenEnabled) {
- String remoteUser = getRemoteUser().getShortUserName();
- builder.setToken(secretManager.generateToken(remoteUser,
- allocatedBlock.getBlockID().toString(),
- getAclForUser(remoteUser),
- scmBlockSize));
- }
-
- OmKeyLocationInfo subKeyInfo = builder.build();
- locations.add(subKeyInfo);
- requestedSize -= allocateSize;
- }
// NOTE size of a key is not a hard limit on anything, it is a value that
// client should expect, in terms of current size of key. If client sets a
// value, then this value is used, otherwise, we allocate a single block
// which is the current size, if read by the client.
long size = args.getDataSize() >= 0 ? args.getDataSize() : scmBlockSize;
- OmKeyInfo keyInfo;
- long openVersion;
if (args.getIsMultipartKey()) {
// For this upload part we don't need to check in KeyTable. As this
@@ -449,7 +424,7 @@ public class KeyManagerImpl implements KeyManager {
openVersion = 0;
}
}
- String openKey = metadataManager.getOpenKey(
+ openKey = metadataManager.getOpenKey(
volumeName, bucketName, keyName, currentTime);
if (metadataManager.getOpenKeyTable().get(openKey) != null) {
// This should not happen. If this condition is satisfied, it means
@@ -464,10 +439,8 @@ public class KeyManagerImpl implements KeyManager {
throw new OMException("Cannot allocate key. Not able to get a valid" +
"open key id.", ResultCodes.KEY_ALLOCATION_ERROR);
}
- metadataManager.getOpenKeyTable().put(openKey, keyInfo);
LOG.debug("Key {} allocated in volume {} bucket {}",
keyName, volumeName, bucketName);
- return new OpenKeySession(currentTime, keyInfo, openVersion);
} catch (OMException e) {
throw e;
} catch (IOException ex) {
@@ -478,6 +451,19 @@ public class KeyManagerImpl implements KeyManager {
} finally {
metadataManager.getLock().releaseBucketLock(volumeName, bucketName);
}
+
+ // requested size is not required but more like a optimization:
+ // SCM looks at the requested, if it 0, no block will be allocated at
+ // the point, if client needs more blocks, client can always call
+ // allocateBlock. But if requested size is not 0, OM will preallocate
+ // some blocks and piggyback to client, to save RPC calls.
+ if (requestedSize > 0) {
+ List<OmKeyLocationInfo> locationInfos =
+ allocateBlock(keyInfo, new ExcludeList(), requestedSize);
+ keyInfo.appendNewBlocks(locationInfos);
+ }
+ metadataManager.getOpenKeyTable().put(openKey, keyInfo);
+ return new OpenKeySession(currentTime, keyInfo, openVersion);
}
/**
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
index 992ccaf..a76d052 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java
@@ -18,177 +18,141 @@
package org.apache.hadoop.ozone.om;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.ozone.om.codec.OmBucketInfoCodec;
-import org.apache.hadoop.ozone.om.codec.OmKeyInfoCodec;
-import org.apache.hadoop.ozone.om.codec.OmVolumeArgsCodec;
+import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
-import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
-import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyInfo;
+import org.apache.hadoop.ozone.om.helpers.*;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.utils.db.CodecRegistry;
-import org.apache.hadoop.utils.db.RDBStore;
-import org.apache.hadoop.utils.db.Table;
-import org.apache.hadoop.utils.db.TableConfig;
-import org.junit.Before;
-import org.junit.Rule;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.DBOptions;
-import org.rocksdb.RocksDB;
-import org.rocksdb.Statistics;
-import org.rocksdb.StatsLevel;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.*;
/**
* Test class for @{@link KeyManagerImpl}.
- * */
+ */
public class TestKeyManagerImpl {
private static KeyManagerImpl keyManager;
- private static ScmBlockLocationProtocol scmBlockLocationProtocol;
+ private static VolumeManagerImpl volumeManager;
+ private static BucketManagerImpl bucketManager;
+ private static StorageContainerManager scm;
+ private static ScmBlockLocationProtocol mockScmBlockLocationProtocol;
private static OzoneConfiguration conf;
private static OMMetadataManager metadataManager;
- private static long blockSize = 1000;
+ private static File dir;
+ private static long scmBlockSize;
private static final String KEY_NAME = "key1";
private static final String BUCKET_NAME = "bucket1";
private static final String VOLUME_NAME = "vol1";
- private static RDBStore rdbStore = null;
- private static Table<String, OmKeyInfo> keyTable = null;
- private static Table<String, OmBucketInfo> bucketTable = null;
- private static Table<String, OmVolumeArgs> volumeTable = null;
- private static DBOptions options = null;
- private KeyInfo keyData;
- @Rule
- public TemporaryFolder folder = new TemporaryFolder();
- @Before
- public void setUp() throws Exception {
+ @BeforeClass
+ public static void setUp() throws Exception {
conf = new OzoneConfiguration();
- scmBlockLocationProtocol = Mockito.mock(ScmBlockLocationProtocol.class);
- metadataManager = Mockito.mock(OMMetadataManager.class);
- keyManager = new KeyManagerImpl(scmBlockLocationProtocol, metadataManager,
- conf, "om1", null);
- setupMocks();
- }
-
- private void setupMocks() throws Exception {
- Mockito.when(scmBlockLocationProtocol
+ dir = GenericTestUtils.getRandomizedTestDir();
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.toString());
+ mockScmBlockLocationProtocol = Mockito.mock(ScmBlockLocationProtocol.class);
+ metadataManager = new OmMetadataManagerImpl(conf);
+ volumeManager = new VolumeManagerImpl(metadataManager, conf);
+ bucketManager = new BucketManagerImpl(metadataManager);
+ NodeManager nodeManager = new MockNodeManager(true, 10);
+ SCMConfigurator configurator = new SCMConfigurator();
+ configurator.setScmNodeManager(nodeManager);
+ scm = TestUtils.getScm(conf, configurator);
+ scm.start();
+ scm.exitChillMode();
+ scmBlockSize = (long) conf
+ .getStorageSize(OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT,
+ StorageUnit.BYTES);
+ conf.setLong(OZONE_KEY_PREALLOCATION_MAXSIZE, scmBlockSize * 10);
+
+ keyManager =
+ new KeyManagerImpl(scm.getBlockProtocolServer(), metadataManager, conf,
+ "om1", null);
+ Mockito.when(mockScmBlockLocationProtocol
.allocateBlock(Mockito.anyLong(), Mockito.any(ReplicationType.class),
Mockito.any(ReplicationFactor.class), Mockito.anyString(),
- Mockito.any(ExcludeList.class)))
- .thenThrow(
- new SCMException("ChillModePrecheck failed for allocateBlock",
- ResultCodes.CHILL_MODE_EXCEPTION));
- setupRocksDb();
- Mockito.when(metadataManager.getVolumeTable()).thenReturn(volumeTable);
- Mockito.when(metadataManager.getBucketTable()).thenReturn(bucketTable);
- Mockito.when(metadataManager.getOpenKeyTable()).thenReturn(keyTable);
- Mockito.when(metadataManager.getLock())
- .thenReturn(new OzoneManagerLock(conf));
- Mockito.when(metadataManager.getVolumeKey(VOLUME_NAME))
- .thenReturn(VOLUME_NAME);
- Mockito.when(metadataManager.getBucketKey(VOLUME_NAME, BUCKET_NAME))
- .thenReturn(BUCKET_NAME);
- Mockito.when(metadataManager.getOpenKey(VOLUME_NAME, BUCKET_NAME,
- KEY_NAME, 1)).thenReturn(KEY_NAME);
+ Mockito.any(ExcludeList.class))).thenThrow(
+ new SCMException("ChillModePrecheck failed for allocateBlock",
+ ResultCodes.CHILL_MODE_EXCEPTION));
+ createVolume(VOLUME_NAME);
+ createBucket(VOLUME_NAME, BUCKET_NAME);
}
- private void setupRocksDb() throws Exception {
- options = new DBOptions();
- options.setCreateIfMissing(true);
- options.setCreateMissingColumnFamilies(true);
-
- Statistics statistics = new Statistics();
- statistics.setStatsLevel(StatsLevel.ALL);
- options = options.setStatistics(statistics);
+ @AfterClass
+ public static void cleanup() throws Exception {
+ scm.stop();
+ scm.join();
+ metadataManager.stop();
+ keyManager.stop();
+ FileUtils.deleteDirectory(dir);
+ }
- Set<TableConfig> configSet = new HashSet<>();
- for (String name : Arrays
- .asList(DFSUtil.bytes2String(RocksDB.DEFAULT_COLUMN_FAMILY),
- "testKeyTable", "testBucketTable", "testVolumeTable")) {
- TableConfig newConfig = new TableConfig(name, new ColumnFamilyOptions());
- configSet.add(newConfig);
- }
- keyData = KeyInfo.newBuilder()
- .setKeyName(KEY_NAME)
- .setBucketName(BUCKET_NAME)
- .setVolumeName(VOLUME_NAME)
- .setDataSize(blockSize)
- .setType(ReplicationType.STAND_ALONE)
- .setFactor(ReplicationFactor.ONE)
- .setCreationTime(Time.now())
- .setModificationTime(Time.now())
+ private static void createBucket(String volumeName, String bucketName)
+ throws IOException {
+ OmBucketInfo bucketInfo = OmBucketInfo.newBuilder()
+ .setVolumeName(volumeName)
+ .setBucketName(bucketName)
.build();
+ bucketManager.createBucket(bucketInfo);
+ }
- CodecRegistry registry = new CodecRegistry();
- registry.addCodec(OmKeyInfo.class, new OmKeyInfoCodec());
- registry.addCodec(OmVolumeArgs.class, new OmVolumeArgsCodec());
- registry.addCodec(OmBucketInfo.class, new OmBucketInfoCodec());
- rdbStore = new RDBStore(folder.newFolder(), options, configSet, registry);
-
- keyTable =
- rdbStore.getTable("testKeyTable", String.class, OmKeyInfo.class);
-
- bucketTable =
- rdbStore.getTable("testBucketTable", String.class, OmBucketInfo.class);
-
- volumeTable =
- rdbStore.getTable("testVolumeTable", String.class, OmVolumeArgs.class);
-
- volumeTable.put(VOLUME_NAME, OmVolumeArgs.newBuilder()
- .setAdminName("a")
- .setOwnerName("o")
- .setVolume(VOLUME_NAME)
- .build());
-
- bucketTable.put(BUCKET_NAME,
- new OmBucketInfo.Builder().setBucketName(BUCKET_NAME)
- .setVolumeName(VOLUME_NAME).build());
-
- keyTable.put(KEY_NAME, new OmKeyInfo.Builder()
- .setVolumeName(VOLUME_NAME)
- .setBucketName(BUCKET_NAME)
- .setKeyName(KEY_NAME)
- .setReplicationType(ReplicationType.STAND_ALONE)
- .setReplicationFactor(ReplicationFactor.THREE)
- .build());
-
+ private static void createVolume(String volumeName) throws IOException {
+ OmVolumeArgs volumeArgs = OmVolumeArgs.newBuilder()
+ .setVolume(volumeName)
+ .setAdminName("bilbo")
+ .setOwnerName("bilbo")
+ .build();
+ volumeManager.createVolume(volumeArgs);
}
@Test
public void allocateBlockFailureInChillMode() throws Exception {
- OmKeyArgs keyArgs = new OmKeyArgs.Builder().setKeyName(KEY_NAME)
+ KeyManager keyManager1 = new KeyManagerImpl(mockScmBlockLocationProtocol,
+ metadataManager, conf, "om1", null);
+ OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+ .setKeyName(KEY_NAME)
.setBucketName(BUCKET_NAME)
.setFactor(ReplicationFactor.ONE)
.setType(ReplicationType.STAND_ALONE)
- .setVolumeName(VOLUME_NAME).build();
+ .setVolumeName(VOLUME_NAME)
+ .build();
+ OpenKeySession keySession = keyManager1.openKey(keyArgs);
LambdaTestUtils.intercept(OMException.class,
"ChillModePrecheck failed for allocateBlock", () -> {
- keyManager.allocateBlock(keyArgs, 1, new ExcludeList());
+ keyManager1
+ .allocateBlock(keyArgs, keySession.getId(), new ExcludeList());
});
}
@Test
public void openKeyFailureInChillMode() throws Exception {
- OmKeyArgs keyArgs = new OmKeyArgs.Builder().setKeyName(KEY_NAME)
+ KeyManager keyManager1 = new KeyManagerImpl(mockScmBlockLocationProtocol,
+ metadataManager, conf, "om1", null);
+ OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+ .setKeyName(KEY_NAME)
.setBucketName(BUCKET_NAME)
.setFactor(ReplicationFactor.ONE)
.setDataSize(1000)
@@ -196,7 +160,23 @@ public class TestKeyManagerImpl {
.setVolumeName(VOLUME_NAME).build();
LambdaTestUtils.intercept(OMException.class,
"ChillModePrecheck failed for allocateBlock", () -> {
- keyManager.openKey(keyArgs);
+ keyManager1.openKey(keyArgs);
});
}
+
+ @Test
+ public void openKeyWithMultipleBlocks() throws IOException {
+ OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+ .setKeyName(UUID.randomUUID().toString())
+ .setBucketName(BUCKET_NAME)
+ .setFactor(ReplicationFactor.ONE)
+ .setDataSize(scmBlockSize * 10)
+ .setType(ReplicationType.STAND_ALONE)
+ .setVolumeName(VOLUME_NAME)
+ .build();
+ OpenKeySession keySession = keyManager.openKey(keyArgs);
+ OmKeyInfo keyInfo = keySession.getKeyInfo();
+ Assert.assertEquals(10,
+ keyInfo.getLatestVersionLocations().getLocationList().size());
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org