You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2019/03/15 17:41:11 UTC
[hadoop] branch trunk updated: HDDS-1138. Ozone Client should avoid
talking to SCM directly. Contributed by Xiaoyu Yao and Mukul Kumar Singh.
This is an automated email from the ASF dual-hosted git repository.
xyao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new d1afa03 HDDS-1138. Ozone Client should avoid talking to SCM directly. Contributed by Xiaoyu Yao and Mukul Kumar Singh.
d1afa03 is described below
commit d1afa038044f6aaec43551571caae567b2bf4139
Author: Xiaoyu Yao <xy...@apache.org>
AuthorDate: Fri Mar 15 10:41:06 2019 -0700
HDDS-1138. Ozone Client should avoid talking to SCM directly. Contributed by Xiaoyu Yao and Mukul Kumar Singh.
Closes #585
---
.../hadoop/ozone/client/io/KeyInputStream.java | 5 +--
.../hadoop/ozone/client/io/KeyOutputStream.java | 20 ++-------
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 2 -
.../apache/hadoop/ozone/om/helpers/OmKeyArgs.java | 16 +++++++-
.../hadoop/ozone/om/helpers/OmKeyLocationInfo.java | 48 +++++++++++++++++++---
.../src/main/proto/OzoneManagerProtocol.proto | 7 ++++
.../web/storage/DistributedStorageHandler.java | 1 -
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 46 +++++++++++++++++----
.../org/apache/hadoop/ozone/om/OzoneManager.java | 3 +-
.../java/org/apache/hadoop/ozone/om/ScmClient.java | 44 ++++++++++++++++++++
10 files changed, 154 insertions(+), 38 deletions(-)
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index d8bfdbd..3a92e01 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -277,9 +276,7 @@ public class KeyInputStream extends InputStream implements Seekable {
OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(i);
BlockID blockID = omKeyLocationInfo.getBlockID();
long containerID = blockID.getContainerID();
- ContainerWithPipeline containerWithPipeline =
- storageContainerLocationClient.getContainerWithPipeline(containerID);
- Pipeline pipeline = containerWithPipeline.getPipeline();
+ Pipeline pipeline = omKeyLocationInfo.getPipeline();
// irrespective of the container state, we will always read via Standalone
// protocol.
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index a379889..3bd572d 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -26,8 +26,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ChecksumType;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
@@ -73,7 +71,6 @@ public class KeyOutputStream extends OutputStream {
private final ArrayList<BlockOutputStreamEntry> streamEntries;
private int currentStreamIndex;
private final OzoneManagerProtocol omClient;
- private final StorageContainerLocationProtocol scmClient;
private final OmKeyArgs keyArgs;
private final long openID;
private final XceiverClientManager xceiverClientManager;
@@ -100,7 +97,6 @@ public class KeyOutputStream extends OutputStream {
public KeyOutputStream() {
streamEntries = new ArrayList<>();
omClient = null;
- scmClient = null;
keyArgs = null;
openID = -1;
xceiverClientManager = null;
@@ -136,6 +132,7 @@ public class KeyOutputStream extends OutputStream {
new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID())
.setLength(streamEntry.getCurrentPosition()).setOffset(0)
.setToken(streamEntry.getToken())
+ .setPipeline(streamEntry.getPipeline())
.build();
LOG.debug("block written " + streamEntry.getBlockID() + ", length "
+ streamEntry.getCurrentPosition() + " bcsID "
@@ -149,7 +146,6 @@ public class KeyOutputStream extends OutputStream {
@SuppressWarnings("parameternumber")
public KeyOutputStream(OpenKeySession handler,
XceiverClientManager xceiverClientManager,
- StorageContainerLocationProtocol scmClient,
OzoneManagerProtocol omClient, int chunkSize,
String requestId, ReplicationFactor factor, ReplicationType type,
long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout,
@@ -158,7 +154,6 @@ public class KeyOutputStream extends OutputStream {
this.streamEntries = new ArrayList<>();
this.currentStreamIndex = 0;
this.omClient = omClient;
- this.scmClient = scmClient;
OmKeyInfo info = handler.getKeyInfo();
// Retrieve the file encryption key info, null if file is not in
// encrypted bucket.
@@ -221,15 +216,14 @@ public class KeyOutputStream extends OutputStream {
private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo)
throws IOException {
- ContainerWithPipeline containerWithPipeline = scmClient
- .getContainerWithPipeline(subKeyInfo.getContainerID());
+ Preconditions.checkNotNull(subKeyInfo.getPipeline());
UserGroupInformation.getCurrentUser().addToken(subKeyInfo.getToken());
BlockOutputStreamEntry.Builder builder =
new BlockOutputStreamEntry.Builder()
.setBlockID(subKeyInfo.getBlockID())
.setKey(keyArgs.getKeyName())
.setXceiverClientManager(xceiverClientManager)
- .setPipeline(containerWithPipeline.getPipeline())
+ .setPipeline(subKeyInfo.getPipeline())
.setRequestId(requestID)
.setChunkSize(chunkSize)
.setLength(subKeyInfo.getLength())
@@ -637,7 +631,6 @@ public class KeyOutputStream extends OutputStream {
public static class Builder {
private OpenKeySession openHandler;
private XceiverClientManager xceiverManager;
- private StorageContainerLocationProtocol scmClient;
private OzoneManagerProtocol omClient;
private int chunkSize;
private String requestID;
@@ -675,11 +668,6 @@ public class KeyOutputStream extends OutputStream {
return this;
}
- public Builder setScmClient(StorageContainerLocationProtocol client) {
- this.scmClient = client;
- return this;
- }
-
public Builder setOmClient(
OzoneManagerProtocol client) {
this.omClient = client;
@@ -747,7 +735,7 @@ public class KeyOutputStream extends OutputStream {
}
public KeyOutputStream build() throws IOException {
- return new KeyOutputStream(openHandler, xceiverManager, scmClient,
+ return new KeyOutputStream(openHandler, xceiverManager,
omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
streamBufferMaxSize, blockSize, watchTimeout, checksumType,
bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey,
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index d059582..83d9ec2 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -602,7 +602,6 @@ public class RpcClient implements ClientProtocol {
new KeyOutputStream.Builder()
.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
- .setScmClient(storageContainerLocationClient)
.setOmClient(ozoneManagerClient)
.setChunkSize(chunkSize)
.setRequestID(requestId)
@@ -865,7 +864,6 @@ public class RpcClient implements ClientProtocol {
new KeyOutputStream.Builder()
.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
- .setScmClient(storageContainerLocationClient)
.setOmClient(ozoneManagerClient)
.setChunkSize(chunkSize)
.setRequestID(requestId)
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
index 9620600..d90345c 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
@@ -44,13 +44,14 @@ public final class OmKeyArgs implements Auditable {
private final String multipartUploadID;
private final int multipartUploadPartNumber;
private Map<String, String> metadata;
+ private boolean refreshPipeline;
@SuppressWarnings("parameternumber")
private OmKeyArgs(String volumeName, String bucketName, String keyName,
long dataSize, ReplicationType type, ReplicationFactor factor,
List<OmKeyLocationInfo> locationInfoList, boolean isMultipart,
String uploadID, int partNumber,
- Map<String, String> metadataMap) {
+ Map<String, String> metadataMap, boolean refreshPipeline) {
this.volumeName = volumeName;
this.bucketName = bucketName;
this.keyName = keyName;
@@ -62,6 +63,7 @@ public final class OmKeyArgs implements Auditable {
this.multipartUploadID = uploadID;
this.multipartUploadPartNumber = partNumber;
this.metadata = metadataMap;
+ this.refreshPipeline = refreshPipeline;
}
public boolean getIsMultipartKey() {
@@ -120,6 +122,10 @@ public final class OmKeyArgs implements Auditable {
return locationInfoList;
}
+ public boolean getRefreshPipeline() {
+ return refreshPipeline;
+ }
+
@Override
public Map<String, String> toAuditMap() {
Map<String, String> auditMap = new LinkedHashMap<>();
@@ -159,6 +165,7 @@ public final class OmKeyArgs implements Auditable {
private String multipartUploadID;
private int multipartUploadPartNumber;
private Map<String, String> metadata = new HashMap<>();
+ private boolean refreshPipeline;
public Builder setVolumeName(String volume) {
this.volumeName = volume;
@@ -220,10 +227,15 @@ public final class OmKeyArgs implements Auditable {
return this;
}
+ public Builder setRefreshPipeline(boolean refresh) {
+ this.refreshPipeline = refresh;
+ return this;
+ }
+
public OmKeyArgs build() {
return new OmKeyArgs(volumeName, bucketName, keyName, dataSize, type,
factor, locationInfoList, isMultipartKey, multipartUploadID,
- multipartUploadPartNumber, metadata);
+ multipartUploadPartNumber, metadata, refreshPipeline);
}
}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java
index e4eb0f5..bbd1157 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java
@@ -17,6 +17,8 @@
package org.apache.hadoop.ozone.om.helpers;
import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.UnknownPipelineStateException;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation;
import org.apache.hadoop.security.token.Token;
@@ -35,15 +37,20 @@ public final class OmKeyLocationInfo {
// the version number indicating when this block was added
private long createVersion;
- private OmKeyLocationInfo(BlockID blockID, long length, long offset) {
+ private Pipeline pipeline;
+
+ private OmKeyLocationInfo(BlockID blockID, Pipeline pipeline, long length,
+ long offset) {
this.blockID = blockID;
+ this.pipeline = pipeline;
this.length = length;
this.offset = offset;
}
- private OmKeyLocationInfo(BlockID blockID, long length, long offset,
- Token<OzoneBlockTokenIdentifier> token) {
+ private OmKeyLocationInfo(BlockID blockID, Pipeline pipeline, long length,
+ long offset, Token<OzoneBlockTokenIdentifier> token) {
this.blockID = blockID;
+ this.pipeline = pipeline;
this.length = length;
this.offset = offset;
this.token = token;
@@ -69,6 +76,10 @@ public final class OmKeyLocationInfo {
return blockID.getLocalID();
}
+ public Pipeline getPipeline() {
+ return pipeline;
+ }
+
public long getLength() {
return length;
}
@@ -92,6 +103,11 @@ public final class OmKeyLocationInfo {
public void setToken(Token<OzoneBlockTokenIdentifier> token) {
this.token = token;
}
+
+ public void setPipeline(Pipeline pipeline) {
+ this.pipeline = pipeline;
+ }
+
/**
* Builder of OmKeyLocationInfo.
*/
@@ -100,12 +116,18 @@ public final class OmKeyLocationInfo {
private long length;
private long offset;
private Token<OzoneBlockTokenIdentifier> token;
+ private Pipeline pipeline;
public Builder setBlockID(BlockID blockId) {
this.blockID = blockId;
return this;
}
+ public Builder setPipeline(Pipeline pipeline) {
+ this.pipeline = pipeline;
+ return this;
+ }
+
public Builder setLength(long len) {
this.length = len;
return this;
@@ -123,9 +145,9 @@ public final class OmKeyLocationInfo {
public OmKeyLocationInfo build() {
if (token == null) {
- return new OmKeyLocationInfo(blockID, length, offset);
+ return new OmKeyLocationInfo(blockID, pipeline, length, offset);
} else {
- return new OmKeyLocationInfo(blockID, length, offset, token);
+ return new OmKeyLocationInfo(blockID, pipeline, length, offset, token);
}
}
}
@@ -139,12 +161,27 @@ public final class OmKeyLocationInfo {
if (this.token != null) {
builder.setToken(this.token.toTokenProto());
}
+ try {
+ builder.setPipeline(pipeline.getProtobufMessage());
+ } catch (UnknownPipelineStateException e) {
+ //TODO: fix me: we should not return KeyLocation without pipeline.
+ }
return builder.build();
}
+ private static Pipeline getPipeline(KeyLocation keyLocation) {
+ try {
+ return keyLocation.hasPipeline() ?
+ Pipeline.getFromProtobuf(keyLocation.getPipeline()) : null;
+ } catch (UnknownPipelineStateException e) {
+ return null;
+ }
+ }
+
public static OmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) {
OmKeyLocationInfo info = new OmKeyLocationInfo(
BlockID.getFromProtobuf(keyLocation.getBlockID()),
+ getPipeline(keyLocation),
keyLocation.getLength(),
keyLocation.getOffset());
if(keyLocation.hasToken()) {
@@ -161,6 +198,7 @@ public final class OmKeyLocationInfo {
", length=" + length +
", offset=" + offset +
", token=" + token +
+ ", pipeline=" + pipeline +
", createVersion=" + createVersion + '}';
}
}
diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
index 0e94aa6..2f9410b 100644
--- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
+++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
@@ -495,6 +495,13 @@ message KeyLocation {
// indicated at which version this block gets created.
optional uint64 createVersion = 5;
optional hadoop.common.TokenProto token = 6;
+ // Walk around to include pipeline info for client read/write
+ // without talking to scm.
+ // NOTE: the pipeline info may change after pipeline close.
+ // So eventually, we will have to change back to call scm to
+ // get the up to date pipeline information. This will need o3fs
+ // provide not only a OM delegation token but also a SCM delegation token
+ optional hadoop.hdds.Pipeline pipeline = 7;
}
message KeyLocationList {
diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index 6fff3e4..07016eb 100644
--- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -446,7 +446,6 @@ public final class DistributedStorageHandler implements StorageHandler {
new KeyOutputStream.Builder()
.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
- .setScmClient(storageContainerLocationClient)
.setOmClient(ozoneManagerClient)
.setChunkSize(chunkSize)
.setRequestID(args.getRequestID())
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index fbbb104..38d0922 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
@@ -95,6 +96,8 @@ import static org.apache.hadoop.util.Time.monotonicNow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.ws.rs.HEAD;
+
/**
* Implementation of keyManager.
*/
@@ -105,7 +108,7 @@ public class KeyManagerImpl implements KeyManager {
/**
* A SCM block client, used to talk to SCM to allocate block during putKey.
*/
- private final ScmBlockLocationProtocol scmBlockClient;
+ private final ScmClient scmClient;
private final OMMetadataManager metadataManager;
private final long scmBlockSize;
private final boolean useRatis;
@@ -122,14 +125,15 @@ public class KeyManagerImpl implements KeyManager {
public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
OzoneBlockTokenSecretManager secretManager) {
- this(scmBlockClient, metadataManager, conf, omId, secretManager, null);
+ this(new ScmClient(scmBlockClient, null), metadataManager,
+ conf, omId, secretManager, null);
}
- public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
+ public KeyManagerImpl(ScmClient scmClient,
OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
OzoneBlockTokenSecretManager secretManager,
KeyProviderCryptoExtension kmsProvider) {
- this.scmBlockClient = scmBlockClient;
+ this.scmClient = scmClient;
this.metadataManager = metadataManager;
this.scmBlockSize = (long) conf
.getStorageSize(OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT,
@@ -159,7 +163,7 @@ public class KeyManagerImpl implements KeyManager {
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
- keyDeletingService = new KeyDeletingService(scmBlockClient, this,
+ keyDeletingService = new KeyDeletingService(scmClient.getBlockClient(), this,
blockDeleteInterval, serviceTimeout, configuration);
keyDeletingService.start();
}
@@ -269,7 +273,7 @@ public class KeyManagerImpl implements KeyManager {
String remoteUser = getRemoteUser().getShortUserName();
List<AllocatedBlock> allocatedBlocks;
try {
- allocatedBlocks = scmBlockClient
+ allocatedBlocks = scmClient.getBlockClient()
.allocateBlock(scmBlockSize, numBlocks, keyInfo.getType(),
keyInfo.getFactor(), omId, excludeList);
} catch (SCMException ex) {
@@ -283,7 +287,8 @@ public class KeyManagerImpl implements KeyManager {
OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder()
.setBlockID(new BlockID(allocatedBlock.getBlockID()))
.setLength(scmBlockSize)
- .setOffset(0);
+ .setOffset(0)
+ .setPipeline(allocatedBlock.getPipeline());
if (grpcBlockTokenEnabled) {
builder.setToken(secretManager
.generateToken(remoteUser, allocatedBlock.getBlockID().toString(),
@@ -575,6 +580,33 @@ public class KeyManagerImpl implements KeyManager {
});
}
}
+ // Refresh container pipeline info from SCM
+ // based on OmKeyArgs.refreshPipeline flag
+ // 1. Client send initial read request OmKeyArgs.refreshPipeline = false
+ // and uses the pipeline cached in OM to access datanode
+ // 2. If succeeded, done.
+ // 3. If failed due to pipeline does not exist or invalid pipeline state
+ // exception, client should retry lookupKey with
+ // OmKeyArgs.refreshPipeline = true
+ if (args.getRefreshPipeline()) {
+ for (OmKeyLocationInfoGroup key : value.getKeyLocationVersions()) {
+ key.getLocationList().forEach(k -> {
+ // TODO: fix Some tests that may not initialize container client
+ // The production should always have containerClient initialized.
+ if (scmClient.getContainerClient() != null) {
+ try {
+ ContainerWithPipeline cp = scmClient.getContainerClient()
+ .getContainerWithPipeline(k.getContainerID());
+ if (!cp.getPipeline().equals(k.getPipeline())) {
+ k.setPipeline(cp.getPipeline());
+ }
+ } catch (IOException e) {
+ LOG.debug("Unable to update pipeline for container");
+ }
+ }
+ });
+ }
+ }
return value;
} catch (IOException ex) {
LOG.debug("Get key failed for volume:{} bucket:{} key:{}",
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 0b77307..43ad16a 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -334,7 +334,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
omRpcServer = getRpcServer(conf);
omRpcAddress = updateRPCListenAddress(configuration,
OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
- keyManager = new KeyManagerImpl(scmBlockClient, metadataManager,
+ keyManager = new KeyManagerImpl(
+ new ScmClient(scmBlockClient, scmContainerClient), metadataManager,
configuration, omStorage.getOmId(), blockTokenMgr, getKmsProvider());
shutdownHook = () -> {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
new file mode 100644
index 0000000..73a0cf9
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+
+/**
+ * Wrapper class for Scm protocol clients.
+ */
+public class ScmClient {
+
+ private ScmBlockLocationProtocol blockClient;
+ private StorageContainerLocationProtocol containerClient;
+
+ ScmClient(ScmBlockLocationProtocol blockClient,
+ StorageContainerLocationProtocol containerClient) {
+ this.containerClient = containerClient;
+ this.blockClient = blockClient;
+ }
+
+ ScmBlockLocationProtocol getBlockClient() {
+ return this.blockClient;
+ }
+
+ StorageContainerLocationProtocol getContainerClient() {
+ return this.containerClient;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org