You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2019/03/15 17:41:11 UTC

[hadoop] branch trunk updated: HDDS-1138. Ozone Client should avoid talking to SCM directly. Contributed by Xiaoyu Yao and Mukul Kumar Singh.

This is an automated email from the ASF dual-hosted git repository.

xyao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d1afa03  HDDS-1138. Ozone Client should avoid talking to SCM directly. Contributed by Xiaoyu Yao and Mukul Kumar Singh.
d1afa03 is described below

commit d1afa038044f6aaec43551571caae567b2bf4139
Author: Xiaoyu Yao <xy...@apache.org>
AuthorDate: Fri Mar 15 10:41:06 2019 -0700

    HDDS-1138. Ozone Client should avoid talking to SCM directly. Contributed by Xiaoyu Yao and Mukul Kumar Singh.
    
    Closes #585
---
 .../hadoop/ozone/client/io/KeyInputStream.java     |  5 +--
 .../hadoop/ozone/client/io/KeyOutputStream.java    | 20 ++-------
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  2 -
 .../apache/hadoop/ozone/om/helpers/OmKeyArgs.java  | 16 +++++++-
 .../hadoop/ozone/om/helpers/OmKeyLocationInfo.java | 48 +++++++++++++++++++---
 .../src/main/proto/OzoneManagerProtocol.proto      |  7 ++++
 .../web/storage/DistributedStorageHandler.java     |  1 -
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 46 +++++++++++++++++----
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  3 +-
 .../java/org/apache/hadoop/ozone/om/ScmClient.java | 44 ++++++++++++++++++++
 10 files changed, 154 insertions(+), 38 deletions(-)

diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index d8bfdbd..3a92e01 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -277,9 +276,7 @@ public class KeyInputStream extends InputStream implements Seekable {
       OmKeyLocationInfo omKeyLocationInfo = keyLocationInfos.get(i);
       BlockID blockID = omKeyLocationInfo.getBlockID();
       long containerID = blockID.getContainerID();
-      ContainerWithPipeline containerWithPipeline =
-          storageContainerLocationClient.getContainerWithPipeline(containerID);
-      Pipeline pipeline = containerWithPipeline.getPipeline();
+      Pipeline pipeline = omKeyLocationInfo.getPipeline();
 
       // irrespective of the container state, we will always read via Standalone
       // protocol.
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index a379889..3bd572d 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -26,8 +26,6 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ChecksumType;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.scm.storage.BufferPool;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
@@ -73,7 +71,6 @@ public class KeyOutputStream extends OutputStream {
   private final ArrayList<BlockOutputStreamEntry> streamEntries;
   private int currentStreamIndex;
   private final OzoneManagerProtocol omClient;
-  private final StorageContainerLocationProtocol scmClient;
   private final OmKeyArgs keyArgs;
   private final long openID;
   private final XceiverClientManager xceiverClientManager;
@@ -100,7 +97,6 @@ public class KeyOutputStream extends OutputStream {
   public KeyOutputStream() {
     streamEntries = new ArrayList<>();
     omClient = null;
-    scmClient = null;
     keyArgs = null;
     openID = -1;
     xceiverClientManager = null;
@@ -136,6 +132,7 @@ public class KeyOutputStream extends OutputStream {
           new OmKeyLocationInfo.Builder().setBlockID(streamEntry.getBlockID())
               .setLength(streamEntry.getCurrentPosition()).setOffset(0)
               .setToken(streamEntry.getToken())
+              .setPipeline(streamEntry.getPipeline())
               .build();
       LOG.debug("block written " + streamEntry.getBlockID() + ", length "
           + streamEntry.getCurrentPosition() + " bcsID "
@@ -149,7 +146,6 @@ public class KeyOutputStream extends OutputStream {
   @SuppressWarnings("parameternumber")
   public KeyOutputStream(OpenKeySession handler,
       XceiverClientManager xceiverClientManager,
-      StorageContainerLocationProtocol scmClient,
       OzoneManagerProtocol omClient, int chunkSize,
       String requestId, ReplicationFactor factor, ReplicationType type,
       long bufferFlushSize, long bufferMaxSize, long size, long watchTimeout,
@@ -158,7 +154,6 @@ public class KeyOutputStream extends OutputStream {
     this.streamEntries = new ArrayList<>();
     this.currentStreamIndex = 0;
     this.omClient = omClient;
-    this.scmClient = scmClient;
     OmKeyInfo info = handler.getKeyInfo();
     // Retrieve the file encryption key info, null if file is not in
     // encrypted bucket.
@@ -221,15 +216,14 @@ public class KeyOutputStream extends OutputStream {
 
   private void addKeyLocationInfo(OmKeyLocationInfo subKeyInfo)
       throws IOException {
-    ContainerWithPipeline containerWithPipeline = scmClient
-        .getContainerWithPipeline(subKeyInfo.getContainerID());
+    Preconditions.checkNotNull(subKeyInfo.getPipeline());
     UserGroupInformation.getCurrentUser().addToken(subKeyInfo.getToken());
     BlockOutputStreamEntry.Builder builder =
         new BlockOutputStreamEntry.Builder()
             .setBlockID(subKeyInfo.getBlockID())
             .setKey(keyArgs.getKeyName())
             .setXceiverClientManager(xceiverClientManager)
-            .setPipeline(containerWithPipeline.getPipeline())
+            .setPipeline(subKeyInfo.getPipeline())
             .setRequestId(requestID)
             .setChunkSize(chunkSize)
             .setLength(subKeyInfo.getLength())
@@ -637,7 +631,6 @@ public class KeyOutputStream extends OutputStream {
   public static class Builder {
     private OpenKeySession openHandler;
     private XceiverClientManager xceiverManager;
-    private StorageContainerLocationProtocol scmClient;
     private OzoneManagerProtocol omClient;
     private int chunkSize;
     private String requestID;
@@ -675,11 +668,6 @@ public class KeyOutputStream extends OutputStream {
       return this;
     }
 
-    public Builder setScmClient(StorageContainerLocationProtocol client) {
-      this.scmClient = client;
-      return this;
-    }
-
     public Builder setOmClient(
         OzoneManagerProtocol client) {
       this.omClient = client;
@@ -747,7 +735,7 @@ public class KeyOutputStream extends OutputStream {
     }
 
     public KeyOutputStream build() throws IOException {
-      return new KeyOutputStream(openHandler, xceiverManager, scmClient,
+      return new KeyOutputStream(openHandler, xceiverManager,
           omClient, chunkSize, requestID, factor, type, streamBufferFlushSize,
           streamBufferMaxSize, blockSize, watchTimeout, checksumType,
           bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey,
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index d059582..83d9ec2 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -602,7 +602,6 @@ public class RpcClient implements ClientProtocol {
         new KeyOutputStream.Builder()
             .setHandler(openKey)
             .setXceiverClientManager(xceiverClientManager)
-            .setScmClient(storageContainerLocationClient)
             .setOmClient(ozoneManagerClient)
             .setChunkSize(chunkSize)
             .setRequestID(requestId)
@@ -865,7 +864,6 @@ public class RpcClient implements ClientProtocol {
         new KeyOutputStream.Builder()
             .setHandler(openKey)
             .setXceiverClientManager(xceiverClientManager)
-            .setScmClient(storageContainerLocationClient)
             .setOmClient(ozoneManagerClient)
             .setChunkSize(chunkSize)
             .setRequestID(requestId)
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
index 9620600..d90345c 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
@@ -44,13 +44,14 @@ public final class OmKeyArgs implements Auditable {
   private final String multipartUploadID;
   private final int multipartUploadPartNumber;
   private Map<String, String> metadata;
+  private boolean refreshPipeline;
 
   @SuppressWarnings("parameternumber")
   private OmKeyArgs(String volumeName, String bucketName, String keyName,
       long dataSize, ReplicationType type, ReplicationFactor factor,
       List<OmKeyLocationInfo> locationInfoList, boolean isMultipart,
       String uploadID, int partNumber,
-      Map<String, String> metadataMap) {
+      Map<String, String> metadataMap, boolean refreshPipeline) {
     this.volumeName = volumeName;
     this.bucketName = bucketName;
     this.keyName = keyName;
@@ -62,6 +63,7 @@ public final class OmKeyArgs implements Auditable {
     this.multipartUploadID = uploadID;
     this.multipartUploadPartNumber = partNumber;
     this.metadata = metadataMap;
+    this.refreshPipeline = refreshPipeline;
   }
 
   public boolean getIsMultipartKey() {
@@ -120,6 +122,10 @@ public final class OmKeyArgs implements Auditable {
     return locationInfoList;
   }
 
+  public boolean getRefreshPipeline() {
+    return refreshPipeline;
+  }
+
   @Override
   public Map<String, String> toAuditMap() {
     Map<String, String> auditMap = new LinkedHashMap<>();
@@ -159,6 +165,7 @@ public final class OmKeyArgs implements Auditable {
     private String multipartUploadID;
     private int multipartUploadPartNumber;
     private Map<String, String> metadata = new HashMap<>();
+    private boolean refreshPipeline;
 
     public Builder setVolumeName(String volume) {
       this.volumeName = volume;
@@ -220,10 +227,15 @@ public final class OmKeyArgs implements Auditable {
       return this;
     }
 
+    public Builder setRefreshPipeline(boolean refresh) {
+      this.refreshPipeline = refresh;
+      return this;
+    }
+
     public OmKeyArgs build() {
       return new OmKeyArgs(volumeName, bucketName, keyName, dataSize, type,
           factor, locationInfoList, isMultipartKey, multipartUploadID,
-          multipartUploadPartNumber, metadata);
+          multipartUploadPartNumber, metadata, refreshPipeline);
     }
 
   }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java
index e4eb0f5..bbd1157 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java
@@ -17,6 +17,8 @@
 package org.apache.hadoop.ozone.om.helpers;
 
 import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.UnknownPipelineStateException;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyLocation;
 import org.apache.hadoop.security.token.Token;
@@ -35,15 +37,20 @@ public final class OmKeyLocationInfo {
   // the version number indicating when this block was added
   private long createVersion;
 
-  private OmKeyLocationInfo(BlockID blockID, long length, long offset) {
+  private Pipeline pipeline;
+
+  private OmKeyLocationInfo(BlockID blockID, Pipeline pipeline, long length,
+                            long offset) {
     this.blockID = blockID;
+    this.pipeline = pipeline;
     this.length = length;
     this.offset = offset;
   }
 
-  private OmKeyLocationInfo(BlockID blockID, long length, long offset,
-      Token<OzoneBlockTokenIdentifier> token) {
+  private OmKeyLocationInfo(BlockID blockID, Pipeline pipeline, long length,
+      long offset, Token<OzoneBlockTokenIdentifier> token) {
     this.blockID = blockID;
+    this.pipeline = pipeline;
     this.length = length;
     this.offset = offset;
     this.token = token;
@@ -69,6 +76,10 @@ public final class OmKeyLocationInfo {
     return blockID.getLocalID();
   }
 
+  public Pipeline getPipeline() {
+    return pipeline;
+  }
+
   public long getLength() {
     return length;
   }
@@ -92,6 +103,11 @@ public final class OmKeyLocationInfo {
   public void setToken(Token<OzoneBlockTokenIdentifier> token) {
     this.token = token;
   }
+
+  public void setPipeline(Pipeline pipeline) {
+    this.pipeline = pipeline;
+  }
+
   /**
    * Builder of OmKeyLocationInfo.
    */
@@ -100,12 +116,18 @@ public final class OmKeyLocationInfo {
     private long length;
     private long offset;
     private Token<OzoneBlockTokenIdentifier> token;
+    private Pipeline pipeline;
 
     public Builder setBlockID(BlockID blockId) {
       this.blockID = blockId;
       return this;
     }
 
+    public Builder setPipeline(Pipeline pipeline) {
+      this.pipeline = pipeline;
+      return this;
+    }
+
     public Builder setLength(long len) {
       this.length = len;
       return this;
@@ -123,9 +145,9 @@ public final class OmKeyLocationInfo {
 
     public OmKeyLocationInfo build() {
       if (token == null) {
-        return new OmKeyLocationInfo(blockID, length, offset);
+        return new OmKeyLocationInfo(blockID, pipeline, length, offset);
       } else {
-        return new OmKeyLocationInfo(blockID, length, offset, token);
+        return new OmKeyLocationInfo(blockID, pipeline, length, offset, token);
       }
     }
   }
@@ -139,12 +161,27 @@ public final class OmKeyLocationInfo {
     if (this.token != null) {
       builder.setToken(this.token.toTokenProto());
     }
+    try {
+      builder.setPipeline(pipeline.getProtobufMessage());
+    } catch (UnknownPipelineStateException e) {
+      //TODO: fix me: we should not return KeyLocation without pipeline.
+    }
     return builder.build();
   }
 
+  private static Pipeline getPipeline(KeyLocation keyLocation) {
+    try {
+      return keyLocation.hasPipeline() ?
+          Pipeline.getFromProtobuf(keyLocation.getPipeline()) : null;
+    } catch (UnknownPipelineStateException e) {
+      return null;
+    }
+  }
+
   public static OmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) {
     OmKeyLocationInfo info = new OmKeyLocationInfo(
         BlockID.getFromProtobuf(keyLocation.getBlockID()),
+        getPipeline(keyLocation),
         keyLocation.getLength(),
         keyLocation.getOffset());
     if(keyLocation.hasToken()) {
@@ -161,6 +198,7 @@ public final class OmKeyLocationInfo {
         ", length=" + length +
         ", offset=" + offset +
         ", token=" + token +
+        ", pipeline=" + pipeline +
         ", createVersion=" + createVersion + '}';
   }
 }
diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
index 0e94aa6..2f9410b 100644
--- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
+++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
@@ -495,6 +495,13 @@ message KeyLocation {
     // indicated at which version this block gets created.
     optional uint64 createVersion = 5;
     optional hadoop.common.TokenProto token = 6;
+    // Walk around to include pipeline info for client read/write
+    // without talking to scm.
+    // NOTE: the pipeline info may change after pipeline close.
+    // So eventually, we will have to change back to call scm to
+    // get the up to date pipeline information. This will need o3fs
+    // provide not only a OM delegation token but also a SCM delegation token
+    optional hadoop.hdds.Pipeline pipeline = 7;
 }
 
 message KeyLocationList {
diff --git a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index 6fff3e4..07016eb 100644
--- a/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-ozone/objectstore-service/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -446,7 +446,6 @@ public final class DistributedStorageHandler implements StorageHandler {
         new KeyOutputStream.Builder()
             .setHandler(openKey)
             .setXceiverClientManager(xceiverClientManager)
-            .setScmClient(storageContainerLocationClient)
             .setOmClient(ozoneManagerClient)
             .setChunkSize(chunkSize)
             .setRequestID(args.getRequestID())
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index fbbb104..38d0922 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
@@ -95,6 +96,8 @@ import static org.apache.hadoop.util.Time.monotonicNow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.HEAD;
+
 /**
  * Implementation of keyManager.
  */
@@ -105,7 +108,7 @@ public class KeyManagerImpl implements KeyManager {
   /**
    * A SCM block client, used to talk to SCM to allocate block during putKey.
    */
-  private final ScmBlockLocationProtocol scmBlockClient;
+  private final ScmClient scmClient;
   private final OMMetadataManager metadataManager;
   private final long scmBlockSize;
   private final boolean useRatis;
@@ -122,14 +125,15 @@ public class KeyManagerImpl implements KeyManager {
   public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
       OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
       OzoneBlockTokenSecretManager secretManager) {
-    this(scmBlockClient, metadataManager, conf, omId, secretManager, null);
+    this(new ScmClient(scmBlockClient, null), metadataManager,
+        conf, omId, secretManager, null);
   }
 
-  public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
+  public KeyManagerImpl(ScmClient scmClient,
       OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
       OzoneBlockTokenSecretManager secretManager,
       KeyProviderCryptoExtension kmsProvider) {
-    this.scmBlockClient = scmBlockClient;
+    this.scmClient = scmClient;
     this.metadataManager = metadataManager;
     this.scmBlockSize = (long) conf
         .getStorageSize(OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT,
@@ -159,7 +163,7 @@ public class KeyManagerImpl implements KeyManager {
           OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
           OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
           TimeUnit.MILLISECONDS);
-      keyDeletingService = new KeyDeletingService(scmBlockClient, this,
+      keyDeletingService = new KeyDeletingService(scmClient.getBlockClient(), this,
           blockDeleteInterval, serviceTimeout, configuration);
       keyDeletingService.start();
     }
@@ -269,7 +273,7 @@ public class KeyManagerImpl implements KeyManager {
     String remoteUser = getRemoteUser().getShortUserName();
     List<AllocatedBlock> allocatedBlocks;
     try {
-      allocatedBlocks = scmBlockClient
+      allocatedBlocks = scmClient.getBlockClient()
           .allocateBlock(scmBlockSize, numBlocks, keyInfo.getType(),
               keyInfo.getFactor(), omId, excludeList);
     } catch (SCMException ex) {
@@ -283,7 +287,8 @@ public class KeyManagerImpl implements KeyManager {
       OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder()
           .setBlockID(new BlockID(allocatedBlock.getBlockID()))
           .setLength(scmBlockSize)
-          .setOffset(0);
+          .setOffset(0)
+          .setPipeline(allocatedBlock.getPipeline());
       if (grpcBlockTokenEnabled) {
         builder.setToken(secretManager
             .generateToken(remoteUser, allocatedBlock.getBlockID().toString(),
@@ -575,6 +580,33 @@ public class KeyManagerImpl implements KeyManager {
           });
         }
       }
+      // Refresh container pipeline info from SCM
+      // based on OmKeyArgs.refreshPipeline flag
+      // 1. Client send initial read request OmKeyArgs.refreshPipeline = false
+      // and uses the pipeline cached in OM to access datanode
+      // 2. If succeeded, done.
+      // 3. If failed due to pipeline does not exist or invalid pipeline state
+      //    exception, client should retry lookupKey with
+      //    OmKeyArgs.refreshPipeline = true
+      if (args.getRefreshPipeline()) {
+        for (OmKeyLocationInfoGroup key : value.getKeyLocationVersions()) {
+          key.getLocationList().forEach(k -> {
+            // TODO: fix Some tests that may not initialize container client
+            // The production should always have containerClient initialized.
+            if (scmClient.getContainerClient() != null) {
+              try {
+                ContainerWithPipeline cp = scmClient.getContainerClient()
+                    .getContainerWithPipeline(k.getContainerID());
+                if (!cp.getPipeline().equals(k.getPipeline())) {
+                  k.setPipeline(cp.getPipeline());
+                }
+              } catch (IOException e) {
+                LOG.debug("Unable to update pipeline for container");
+              }
+            }
+          });
+        }
+      }
       return value;
     } catch (IOException ex) {
       LOG.debug("Get key failed for volume:{} bucket:{} key:{}",
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 0b77307..43ad16a 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -334,7 +334,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     omRpcServer = getRpcServer(conf);
     omRpcAddress = updateRPCListenAddress(configuration,
         OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
-    keyManager = new KeyManagerImpl(scmBlockClient, metadataManager,
+    keyManager = new KeyManagerImpl(
+        new ScmClient(scmBlockClient, scmContainerClient), metadataManager,
         configuration, omStorage.getOmId(), blockTokenMgr, getKmsProvider());
 
     shutdownHook = () -> {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
new file mode 100644
index 0000000..73a0cf9
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ScmClient.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+
+/**
+ * Wrapper class for Scm protocol clients.
+ */
+public class ScmClient {
+
+  private ScmBlockLocationProtocol blockClient;
+  private StorageContainerLocationProtocol containerClient;
+
+  ScmClient(ScmBlockLocationProtocol blockClient,
+            StorageContainerLocationProtocol containerClient) {
+    this.containerClient = containerClient;
+    this.blockClient = blockClient;
+  }
+
+  ScmBlockLocationProtocol getBlockClient() {
+     return this.blockClient;
+  }
+
+  StorageContainerLocationProtocol getContainerClient() {
+    return this.containerClient;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org