You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bo...@apache.org on 2018/08/02 17:17:51 UTC
[28/50] [abbrv] hadoop git commit: HDDS-226. Client should update
block length in OM while committing the key. Contributed by Shashikant
Banerjee.
HDDS-226. Client should update block length in OM while committing the key. Contributed by Shashikant Banerjee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f4db753b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f4db753b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f4db753b
Branch: refs/heads/YARN-7402
Commit: f4db753bb6b4648c583722dbe8108973c23ba06f
Parents: 6310c0d
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Wed Aug 1 09:02:43 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Wed Aug 1 09:03:00 2018 +0530
----------------------------------------------------------------------
.../ozone/client/io/ChunkGroupOutputStream.java | 22 +++++++++++-
.../hadoop/ozone/om/helpers/OmKeyArgs.java | 26 ++++++++++++---
.../hadoop/ozone/om/helpers/OmKeyInfo.java | 29 ++++++++++++++--
.../ozone/om/helpers/OmKeyLocationInfo.java | 6 +++-
...neManagerProtocolClientSideTranslatorPB.java | 8 ++++-
.../src/main/proto/OzoneManagerProtocol.proto | 1 +
.../ozone/client/rpc/TestOzoneRpcClient.java | 35 ++++++++++++++++++++
.../hadoop/ozone/om/TestOmBlockVersioning.java | 13 +++++++-
.../apache/hadoop/ozone/om/KeyManagerImpl.java | 4 +++
...neManagerProtocolServerSideTranslatorPB.java | 5 ++-
10 files changed, 138 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4db753b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
index 9443317..83b4dfd 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java
@@ -76,7 +76,7 @@ public class ChunkGroupOutputStream extends OutputStream {
private final int chunkSize;
private final String requestID;
private boolean closed;
-
+ private List<OmKeyLocationInfo> locationInfoList;
/**
* A constructor for testing purpose only.
*/
@@ -91,6 +91,7 @@ public class ChunkGroupOutputStream extends OutputStream {
chunkSize = 0;
requestID = null;
closed = false;
+ locationInfoList = null;
}
/**
@@ -133,6 +134,7 @@ public class ChunkGroupOutputStream extends OutputStream {
this.xceiverClientManager = xceiverClientManager;
this.chunkSize = chunkSize;
this.requestID = requestId;
+ this.locationInfoList = new ArrayList<>();
LOG.debug("Expecting open key with one block, but got" +
info.getKeyLocationVersions().size());
}
@@ -196,8 +198,19 @@ public class ChunkGroupOutputStream extends OutputStream {
streamEntries.add(new ChunkOutputStreamEntry(subKeyInfo.getBlockID(),
keyArgs.getKeyName(), xceiverClientManager, xceiverClient, requestID,
chunkSize, subKeyInfo.getLength()));
+ // reset the original length to zero here. It will be updated as and when
+ // the data gets written.
+ subKeyInfo.setLength(0);
+ locationInfoList.add(subKeyInfo);
}
+ private void incrementBlockLength(int index, long length) {
+ if (locationInfoList != null) {
+ OmKeyLocationInfo locationInfo = locationInfoList.get(index);
+ long originalLength = locationInfo.getLength();
+ locationInfo.setLength(originalLength + length);
+ }
+ }
@VisibleForTesting
public long getByteOffset() {
@@ -222,6 +235,7 @@ public class ChunkGroupOutputStream extends OutputStream {
}
ChunkOutputStreamEntry entry = streamEntries.get(currentStreamIndex);
entry.write(b);
+ incrementBlockLength(currentStreamIndex, 1);
if (entry.getRemaining() <= 0) {
currentStreamIndex += 1;
}
@@ -276,6 +290,7 @@ public class ChunkGroupOutputStream extends OutputStream {
ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex);
int writeLen = Math.min(len, (int)current.getRemaining());
current.write(b, off, writeLen);
+ incrementBlockLength(currentStreamIndex, writeLen);
if (current.getRemaining() <= 0) {
currentStreamIndex += 1;
}
@@ -328,8 +343,13 @@ public class ChunkGroupOutputStream extends OutputStream {
}
if (keyArgs != null) {
// in test, this could be null
+ long length =
+ locationInfoList.parallelStream().mapToLong(e -> e.getLength()).sum();
+ Preconditions.checkState(byteOffset == length);
keyArgs.setDataSize(byteOffset);
+ keyArgs.setLocationInfoList(locationInfoList);
omClient.commitKey(keyArgs, openID);
+ locationInfoList = null;
} else {
LOG.warn("Closing ChunkGroupOutputStream, but key args is null");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4db753b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
index 1f8ed5f..aab35c5 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyArgs.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.om.helpers;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import java.util.List;
+
/**
* Args for key. Client use this to specify key's attributes on key creation
* (putKey()).
@@ -30,15 +32,18 @@ public final class OmKeyArgs {
private long dataSize;
private final ReplicationType type;
private final ReplicationFactor factor;
+ private List<OmKeyLocationInfo> locationInfoList;
private OmKeyArgs(String volumeName, String bucketName, String keyName,
- long dataSize, ReplicationType type, ReplicationFactor factor) {
+ long dataSize, ReplicationType type, ReplicationFactor factor,
+ List<OmKeyLocationInfo> locationInfoList) {
this.volumeName = volumeName;
this.bucketName = bucketName;
this.keyName = keyName;
this.dataSize = dataSize;
this.type = type;
this.factor = factor;
+ this.locationInfoList = locationInfoList;
}
public ReplicationType getType() {
@@ -69,6 +74,14 @@ public final class OmKeyArgs {
dataSize = size;
}
+ public void setLocationInfoList(List<OmKeyLocationInfo> locationInfoList) {
+ this.locationInfoList = locationInfoList;
+ }
+
+ public List<OmKeyLocationInfo> getLocationInfoList() {
+ return locationInfoList;
+ }
+
/**
* Builder class of OmKeyArgs.
*/
@@ -79,7 +92,7 @@ public final class OmKeyArgs {
private long dataSize;
private ReplicationType type;
private ReplicationFactor factor;
-
+ private List<OmKeyLocationInfo> locationInfoList;
public Builder setVolumeName(String volume) {
this.volumeName = volume;
@@ -111,9 +124,14 @@ public final class OmKeyArgs {
return this;
}
+ public Builder setLocationInfoList(List<OmKeyLocationInfo> locationInfos) {
+ this.locationInfoList = locationInfos;
+ return this;
+ }
+
public OmKeyArgs build() {
- return new OmKeyArgs(volumeName, bucketName, keyName, dataSize,
- type, factor);
+ return new OmKeyArgs(volumeName, bucketName, keyName, dataSize, type,
+ factor, locationInfoList);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4db753b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
index 05c8d45..3603964 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyInfo.java
@@ -101,8 +101,7 @@ public final class OmKeyInfo {
this.dataSize = size;
}
- public synchronized OmKeyLocationInfoGroup getLatestVersionLocations()
- throws IOException {
+ public synchronized OmKeyLocationInfoGroup getLatestVersionLocations() {
return keyLocationVersions.size() == 0? null :
keyLocationVersions.get(keyLocationVersions.size() - 1);
}
@@ -116,6 +115,32 @@ public final class OmKeyInfo {
}
/**
+ * updates the length of the each block in the list given.
+ * This will be called when the key is being committed to OzoneManager.
+ *
+ * @param locationInfoList list of locationInfo
+ * @throws IOException
+ */
+ public void updateLocationInfoList(List<OmKeyLocationInfo> locationInfoList) {
+ OmKeyLocationInfoGroup keyLocationInfoGroup = getLatestVersionLocations();
+ List<OmKeyLocationInfo> currentList =
+ keyLocationInfoGroup.getLocationList();
+ Preconditions.checkNotNull(keyLocationInfoGroup);
+ Preconditions.checkState(locationInfoList.size() <= currentList.size());
+ for (OmKeyLocationInfo current : currentList) {
+ // For Versioning, while committing the key for the newer version,
+ // we just need to update the lengths for new blocks. Need to iterate over
+ // and find the new blocks added in the latest version.
+ for (OmKeyLocationInfo info : locationInfoList) {
+ if (info.getBlockID().equals(current.getBlockID())) {
+ current.setLength(info.getLength());
+ break;
+ }
+ }
+ }
+ }
+
+ /**
* Append a set of blocks to the latest version. Note that these blocks are
* part of the latest version, not a new version.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4db753b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java
index 3f6666d..fae92f8 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmKeyLocationInfo.java
@@ -27,7 +27,7 @@ public final class OmKeyLocationInfo {
private final BlockID blockID;
private final boolean shouldCreateContainer;
// the id of this subkey in all the subkeys.
- private final long length;
+ private long length;
private final long offset;
// the version number indicating when this block was added
private long createVersion;
@@ -68,6 +68,10 @@ public final class OmKeyLocationInfo {
return length;
}
+ public void setLength(long length) {
+ this.length = length;
+ }
+
public long getOffset() {
return offset;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4db753b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 37151fb..e557ac5 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ozone.om.protocolPB;
+import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.protobuf.RpcController;
@@ -581,11 +582,16 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
public void commitKey(OmKeyArgs args, int clientID)
throws IOException {
CommitKeyRequest.Builder req = CommitKeyRequest.newBuilder();
+ List<OmKeyLocationInfo> locationInfoList = args.getLocationInfoList();
+ Preconditions.checkNotNull(locationInfoList);
KeyArgs keyArgs = KeyArgs.newBuilder()
.setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName())
.setKeyName(args.getKeyName())
- .setDataSize(args.getDataSize()).build();
+ .setDataSize(args.getDataSize())
+ .addAllKeyLocations(
+ locationInfoList.stream().map(OmKeyLocationInfo::getProtobuf)
+ .collect(Collectors.toList())).build();
req.setKeyArgs(keyArgs);
req.setClientID(clientID);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4db753b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
index 36b1c83..51a0a7f 100644
--- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
+++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
@@ -234,6 +234,7 @@ message KeyArgs {
optional uint64 dataSize = 4;
optional hadoop.hdds.ReplicationType type = 5;
optional hadoop.hdds.ReplicationFactor factor = 6;
+ repeated KeyLocation keyLocations = 7;
}
message KeyLocation {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4db753b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
index 2fbab36..e31b528 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.client.rpc;
import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -434,6 +435,40 @@ public class TestOzoneRpcClient {
}
@Test
+ public void testValidateBlockLengthWithCommitKey() throws IOException {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+
+ String value = RandomStringUtils.random(RandomUtils.nextInt(0,1024));
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+ String keyName = UUID.randomUUID().toString();
+
+ // create the initial key with size 0, write will allocate the first block.
+ OzoneOutputStream out = bucket.createKey(keyName, 0,
+ ReplicationType.STAND_ALONE, ReplicationFactor.ONE);
+ out.write(value.getBytes());
+ out.close();
+ OmKeyArgs.Builder builder = new OmKeyArgs.Builder();
+ builder.setVolumeName(volumeName).setBucketName(bucketName)
+ .setKeyName(keyName);
+ OmKeyInfo keyInfo = ozoneManager.lookupKey(builder.build());
+
+ List<OmKeyLocationInfo> locationInfoList =
+ keyInfo.getLatestVersionLocations().getBlocksLatestVersionOnly();
+ // LocationList should have only 1 block
+ Assert.assertEquals(1, locationInfoList.size());
+ // make sure the data block size is updated
+ Assert.assertEquals(value.getBytes().length,
+ locationInfoList.get(0).getLength());
+ // make sure the total data size is set correctly
+ Assert.assertEquals(value.getBytes().length, keyInfo.getDataSize());
+ }
+
+
+ @Test
public void testPutKeyRatisOneNode()
throws IOException, OzoneException {
String volumeName = UUID.randomUUID().toString();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4db753b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
index 15122b9..f5dddee 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmBlockVersioning.java
@@ -44,6 +44,7 @@ import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -122,6 +123,9 @@ public class TestOmBlockVersioning {
// 1st update, version 0
OpenKeySession openKey = ozoneManager.openKey(keyArgs);
+ // explicitly set the keyLocation list before committing the key.
+ keyArgs.setLocationInfoList(
+ openKey.getKeyInfo().getLatestVersionLocations().getLocationList());
ozoneManager.commitKey(keyArgs, openKey.getId());
OmKeyInfo keyInfo = ozoneManager.lookupKey(keyArgs);
@@ -134,6 +138,9 @@ public class TestOmBlockVersioning {
openKey = ozoneManager.openKey(keyArgs);
//OmKeyLocationInfo locationInfo =
// ozoneManager.allocateBlock(keyArgs, openKey.getId());
+ // explicitly set the keyLocation list before committing the key.
+ keyArgs.setLocationInfoList(
+ openKey.getKeyInfo().getLatestVersionLocations().getLocationList());
ozoneManager.commitKey(keyArgs, openKey.getId());
keyInfo = ozoneManager.lookupKey(keyArgs);
@@ -144,7 +151,11 @@ public class TestOmBlockVersioning {
// 3rd update, version 2
openKey = ozoneManager.openKey(keyArgs);
// this block will be appended to the latest version of version 2.
- ozoneManager.allocateBlock(keyArgs, openKey.getId());
+ OmKeyLocationInfo locationInfo =
+ ozoneManager.allocateBlock(keyArgs, openKey.getId());
+ List<OmKeyLocationInfo> locationInfoList = new ArrayList<>();
+ locationInfoList.add(locationInfo);
+ keyArgs.setLocationInfoList(locationInfoList);
ozoneManager.commitKey(keyArgs, openKey.getId());
keyInfo = ozoneManager.lookupKey(keyArgs);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4db753b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index ba92a29..75342c6 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -342,6 +342,10 @@ public class KeyManagerImpl implements KeyManager {
OmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(openKeyData));
keyInfo.setDataSize(args.getDataSize());
keyInfo.setModificationTime(Time.now());
+ List<OmKeyLocationInfo> locationInfoList = args.getLocationInfoList();
+ Preconditions.checkNotNull(locationInfoList);
+ //update the block length for each block
+ keyInfo.updateLocationInfoList(locationInfoList);
BatchOperation batch = new BatchOperation();
batch.delete(openKey);
batch.put(objectKeyBytes, keyInfo.getProtobuf().toByteArray());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4db753b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 40a88b6..45ec2d0 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -519,9 +519,12 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
- .setDataSize(keyArgs.getDataSize())
+ .setLocationInfoList(keyArgs.getKeyLocationsList().stream()
+ .map(OmKeyLocationInfo::getFromProtobuf)
+ .collect(Collectors.toList()))
.setType(type)
.setFactor(factor)
+ .setDataSize(keyArgs.getDataSize())
.build();
int id = request.getClientID();
impl.commitKey(omKeyArgs, id);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org