You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2017/07/31 23:01:53 UTC
hadoop git commit: HDFS-11920. Ozone : add key partition. Contributed
by Chen Liang.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7240 1f5353d7b -> 42ab44d34
HDFS-11920. Ozone : add key partition. Contributed by Chen Liang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/42ab44d3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/42ab44d3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/42ab44d3
Branch: refs/heads/HDFS-7240
Commit: 42ab44d34e7f8b9e39168ed68a178e5ba68f46b8
Parents: 1f5353d
Author: Chen Liang <cl...@apache.org>
Authored: Mon Jul 31 16:01:39 2017 -0700
Committer: Chen Liang <cl...@apache.org>
Committed: Mon Jul 31 16:01:39 2017 -0700
----------------------------------------------------------------------
.../apache/hadoop/ksm/helpers/KsmKeyInfo.java | 62 ++--
.../hadoop/ksm/helpers/KsmKeyLocationInfo.java | 137 +++++++++
.../main/proto/KeySpaceManagerProtocol.proto | 17 +-
.../apache/hadoop/ozone/OzoneClientImpl.java | 80 +----
.../apache/hadoop/ozone/OzoneConfigKeys.java | 4 +
.../java/org/apache/hadoop/ozone/OzoneKey.java | 43 +--
.../hadoop/ozone/io/OzoneInputStream.java | 5 +-
.../hadoop/ozone/io/OzoneOutputStream.java | 10 +-
.../apache/hadoop/ozone/ksm/KeyManagerImpl.java | 41 ++-
.../hadoop/ozone/ksm/KeySpaceManager.java | 3 +-
.../org/apache/hadoop/ozone/scm/cli/SQLCLI.java | 6 +-
.../hadoop/ozone/web/handlers/KeyHandler.java | 1 +
.../web/storage/ChunkGroupInputStream.java | 211 +++++++++++++
.../web/storage/ChunkGroupOutputStream.java | 304 +++++++++++++++++++
.../web/storage/DistributedStorageHandler.java | 107 +------
.../src/main/resources/ozone-default.xml | 8 +
.../hadoop/ozone/ksm/TestChunkStreams.java | 174 +++++++++++
.../ksm/TestMultipleContainerReadWrite.java | 210 +++++++++++++
18 files changed, 1171 insertions(+), 252 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyInfo.java
index 05748e8..f46ec89 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyInfo.java
@@ -17,9 +17,11 @@
*/
package org.apache.hadoop.ksm.helpers;
-
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
+import java.util.List;
+import java.util.stream.Collectors;
+
/**
* Args for key block. The block instance for the key requested in putKey.
* This is returned from KSM to client, and client use class to talk to
@@ -30,25 +32,19 @@ public final class KsmKeyInfo {
private final String bucketName;
// name of key client specified
private final String keyName;
- private final String containerName;
- // name of the block id SCM assigned for the key
- private final String blockID;
private final long dataSize;
- private final boolean shouldCreateContainer;
+ private List<KsmKeyLocationInfo> keyLocationList;
private final long creationTime;
private final long modificationTime;
private KsmKeyInfo(String volumeName, String bucketName, String keyName,
- long dataSize, String blockID, String containerName,
- boolean shouldCreateContainer, long creationTime,
+ List<KsmKeyLocationInfo> locationInfos, long dataSize, long creationTime,
long modificationTime) {
this.volumeName = volumeName;
this.bucketName = bucketName;
this.keyName = keyName;
- this.containerName = containerName;
- this.blockID = blockID;
this.dataSize = dataSize;
- this.shouldCreateContainer = shouldCreateContainer;
+ this.keyLocationList = locationInfos;
this.creationTime = creationTime;
this.modificationTime = modificationTime;
}
@@ -65,20 +61,12 @@ public final class KsmKeyInfo {
return keyName;
}
- public String getBlockID() {
- return blockID;
- }
-
- public String getContainerName() {
- return containerName;
- }
-
public long getDataSize() {
return dataSize;
}
- public boolean getShouldCreateContainer() {
- return shouldCreateContainer;
+ public List<KsmKeyLocationInfo> getKeyLocationList() {
+ return keyLocationList;
}
public long getCreationTime() {
@@ -96,10 +84,8 @@ public final class KsmKeyInfo {
private String volumeName;
private String bucketName;
private String keyName;
- private String containerName;
- private String blockID;
private long dataSize;
- private boolean shouldCreateContainer;
+ private List<KsmKeyLocationInfo> ksmKeyLocationInfos;
private long creationTime;
private long modificationTime;
@@ -118,13 +104,9 @@ public final class KsmKeyInfo {
return this;
}
- public Builder setBlockID(String block) {
- this.blockID = block;
- return this;
- }
-
- public Builder setContainerName(String container) {
- this.containerName = container;
+ public Builder setKsmKeyLocationInfos(
+ List<KsmKeyLocationInfo> ksmKeyLocationInfoList) {
+ this.ksmKeyLocationInfos = ksmKeyLocationInfoList;
return this;
}
@@ -133,11 +115,6 @@ public final class KsmKeyInfo {
return this;
}
- public Builder setShouldCreateContainer(boolean create) {
- this.shouldCreateContainer = create;
- return this;
- }
-
public Builder setCreationTime(long creationTime) {
this.creationTime = creationTime;
return this;
@@ -150,8 +127,8 @@ public final class KsmKeyInfo {
public KsmKeyInfo build() {
return new KsmKeyInfo(
- volumeName, bucketName, keyName, dataSize, blockID, containerName,
- shouldCreateContainer, creationTime, modificationTime);
+ volumeName, bucketName, keyName, ksmKeyLocationInfos,
+ dataSize, creationTime, modificationTime);
}
}
@@ -161,9 +138,8 @@ public final class KsmKeyInfo {
.setBucketName(bucketName)
.setKeyName(keyName)
.setDataSize(dataSize)
- .setBlockKey(blockID)
- .setContainerName(containerName)
- .setShouldCreateContainer(shouldCreateContainer)
+ .addAllKeyLocationList(keyLocationList.stream()
+ .map(KsmKeyLocationInfo::getProtobuf).collect(Collectors.toList()))
.setCreationTime(creationTime)
.setModificationTime(modificationTime)
.build();
@@ -174,10 +150,10 @@ public final class KsmKeyInfo {
keyInfo.getVolumeName(),
keyInfo.getBucketName(),
keyInfo.getKeyName(),
+ keyInfo.getKeyLocationListList().stream()
+ .map(KsmKeyLocationInfo::getFromProtobuf)
+ .collect(Collectors.toList()),
keyInfo.getDataSize(),
- keyInfo.getBlockKey(),
- keyInfo.getContainerName(),
- keyInfo.getShouldCreateContainer(),
keyInfo.getCreationTime(),
keyInfo.getModificationTime());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyLocationInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyLocationInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyLocationInfo.java
new file mode 100644
index 0000000..62d20f6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ksm/helpers/KsmKeyLocationInfo.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ksm.helpers;
+
+import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyLocation;
+
+/**
+ * One key can be too huge to fit in one container. In which case it gets split
+ * into a number of subkeys. This class represents one such subkey instance.
+ */
+public final class KsmKeyLocationInfo {
+ private final String containerName;
+ // name of the block id SCM assigned for the key
+ private final String blockID;
+ private final boolean shouldCreateContainer;
+ // the id of this subkey in all the subkeys.
+ private final int index;
+ private final long length;
+ private final long offset;
+
+ private KsmKeyLocationInfo(String containerName,
+ String blockID, boolean shouldCreateContainer, int index,
+ long length, long offset) {
+ this.containerName = containerName;
+ this.blockID = blockID;
+ this.shouldCreateContainer = shouldCreateContainer;
+ this.index = index;
+ this.length = length;
+ this.offset = offset;
+ }
+
+ public String getContainerName() {
+ return containerName;
+ }
+
+ public String getBlockID() {
+ return blockID;
+ }
+
+ public boolean getShouldCreateContainer() {
+ return shouldCreateContainer;
+ }
+
+ public int getIndex() {
+ return index;
+ }
+
+ public long getLength() {
+ return length;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ /**
+ * Builder of KsmKeyLocationInfo.
+ */
+ public static class Builder {
+ private String containerName;
+ private String blockID;
+ private boolean shouldCreateContainer;
+ // the id of this subkey in all the subkeys.
+ private int index;
+ private long length;
+ private long offset;
+ public Builder setContainerName(String container) {
+ this.containerName = container;
+ return this;
+ }
+
+ public Builder setBlockID(String block) {
+ this.blockID = block;
+ return this;
+ }
+
+ public Builder setShouldCreateContainer(boolean create) {
+ this.shouldCreateContainer = create;
+ return this;
+ }
+
+ public Builder setIndex(int id) {
+ this.index = id;
+ return this;
+ }
+
+ public Builder setLength(long len) {
+ this.length = len;
+ return this;
+ }
+
+ public Builder setOffset(long off) {
+ this.offset = off;
+ return this;
+ }
+
+ public KsmKeyLocationInfo build() {
+ return new KsmKeyLocationInfo(containerName, blockID,
+ shouldCreateContainer, index, length, offset);
+ }
+ }
+
+ public KeyLocation getProtobuf() {
+ return KeyLocation.newBuilder()
+ .setContainerName(containerName)
+ .setBlockID(blockID)
+ .setShouldCreateContainer(shouldCreateContainer)
+ .setIndex(index)
+ .setLength(length)
+ .setOffset(offset)
+ .build();
+ }
+
+ public static KsmKeyLocationInfo getFromProtobuf(KeyLocation keyLocation) {
+ return new KsmKeyLocationInfo(
+ keyLocation.getContainerName(),
+ keyLocation.getBlockID(),
+ keyLocation.getShouldCreateContainer(),
+ keyLocation.getIndex(),
+ keyLocation.getLength(),
+ keyLocation.getOffset());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
index f641898..956e7fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/KeySpaceManagerProtocol.proto
@@ -225,16 +225,23 @@ message KeyArgs {
optional uint64 dataSize = 4;
}
+message KeyLocation {
+ required string blockID = 1;
+ required string containerName = 2;
+ required bool shouldCreateContainer = 3;
+ required uint64 offset = 4;
+ required uint64 length = 5;
+ required uint32 index = 6;
+}
+
message KeyInfo {
required string volumeName = 1;
required string bucketName = 2;
required string keyName = 3;
required uint64 dataSize = 4;
- required string blockKey = 5;
- required string containerName = 6;
- required bool shouldCreateContainer = 7;
- required uint64 creationTime = 8;
- required uint64 modificationTime = 9;
+ repeated KeyLocation keyLocationList = 5;
+ required uint64 creationTime = 6;
+ required uint64 modificationTime = 7;
}
message LocateKeyRequest {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java
index 3c79778..3bd74a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientImpl.java
@@ -20,12 +20,7 @@ package org.apache.hadoop.ozone;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.hdfs.ozone.protocol.proto
- .ContainerProtos.ChunkInfo;
-import org.apache.hadoop.hdfs.ozone.protocol.proto
- .ContainerProtos.GetKeyResponseProto;
-import org.apache.hadoop.hdfs.ozone.protocol.proto
- .ContainerProtos.KeyData;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
@@ -39,24 +34,20 @@ import org.apache.hadoop.ksm.protocolPB
import org.apache.hadoop.ksm.protocolPB
.KeySpaceManagerProtocolPB;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.web.storage.ChunkGroupInputStream;
import org.apache.hadoop.ozone.io.OzoneInputStream;
import org.apache.hadoop.ozone.io.OzoneOutputStream;
import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts.Versioning;
import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
+import org.apache.hadoop.ozone.web.storage.ChunkGroupOutputStream;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.XceiverClientManager;
-import org.apache.hadoop.scm.XceiverClientSpi;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.protocolPB
.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.scm.protocolPB
.StorageContainerLocationProtocolPB;
-import org.apache.hadoop.scm.storage.ChunkInputStream;
-import org.apache.hadoop.scm.storage.ChunkOutputStream;
-import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -480,39 +471,11 @@ public class OzoneClientImpl implements OzoneClient, Closeable {
.setDataSize(size)
.build();
- String containerKey = buildContainerKey(volumeName, bucketName, keyName);
KsmKeyInfo keyInfo = keySpaceManagerClient.allocateKey(keyArgs);
- // TODO: the following createContainer and key writes may fail, in which
- // case we should revert the above allocateKey to KSM.
- String containerName = keyInfo.getContainerName();
- XceiverClientSpi xceiverClient = getContainer(containerName);
- if (keyInfo.getShouldCreateContainer()) {
- LOG.debug("Need to create container {} for key: {}/{}/{}", containerName,
- volumeName, bucketName, keyName);
- ContainerProtocolCalls.createContainer(xceiverClient, requestId);
- }
- // establish a connection to the container to write the key
- ChunkOutputStream outputStream = new ChunkOutputStream(containerKey,
- keyName, xceiverClientManager, xceiverClient, requestId, chunkSize);
- return new OzoneOutputStream(outputStream);
- }
-
- /**
- * Creates a container key from any number of components by combining all
- * components with a delimiter.
- *
- * @param parts container key components
- * @return container key
- */
- private static String buildContainerKey(String... parts) {
- return '/' + StringUtils.join('/', parts);
- }
-
- private XceiverClientSpi getContainer(String containerName)
- throws IOException {
- Pipeline pipeline =
- storageContainerLocationClient.getContainer(containerName);
- return xceiverClientManager.acquireClient(pipeline);
+ ChunkGroupOutputStream groupOutputStream =
+ ChunkGroupOutputStream.getFromKsmKeyInfo(keyInfo, xceiverClientManager,
+ storageContainerLocationClient, chunkSize, requestId);
+ return new OzoneOutputStream(groupOutputStream);
}
@Override
@@ -529,29 +492,12 @@ public class OzoneClientImpl implements OzoneClient, Closeable {
.setKeyName(keyName)
.build();
KsmKeyInfo keyInfo = keySpaceManagerClient.lookupKey(keyArgs);
- String containerKey = buildContainerKey(volumeName,
- bucketName, keyName);
- String containerName = keyInfo.getContainerName();
- XceiverClientSpi xceiverClient = getContainer(containerName);
- boolean success = false;
- try {
- LOG.debug("get key accessing {} {}",
- xceiverClient.getPipeline().getContainerName(), containerKey);
- KeyData containerKeyData = KeyData.newBuilder().setContainerName(
- xceiverClient.getPipeline().getContainerName())
- .setName(containerKey).build();
- GetKeyResponseProto response = ContainerProtocolCalls
- .getKey(xceiverClient, containerKeyData, requestId);
- List<ChunkInfo> chunks = response.getKeyData().getChunksList();
- success = true;
- return new OzoneInputStream(new ChunkInputStream(
- containerKey, xceiverClientManager, xceiverClient,
- chunks, requestId));
- } finally {
- if (!success) {
- xceiverClientManager.releaseClient(xceiverClient);
- }
- }
+ LengthInputStream lengthInputStream =
+ ChunkGroupInputStream.getFromKsmKeyInfo(
+ keyInfo, xceiverClientManager, storageContainerLocationClient,
+ requestId);
+ return new OzoneInputStream(
+ (ChunkGroupInputStream)lengthInputStream.getWrappedStream());
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 1822a2a..8c7ac7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -73,6 +73,10 @@ public final class OzoneConfigKeys {
public static final String OZONE_KEY_CACHE = "ozone.key.cache.size";
public static final int OZONE_KEY_CACHE_DEFAULT = 1024;
+ public static final String OZONE_SCM_BLOCK_SIZE_KEY =
+ "ozone.scm.block.size";
+ public static final long OZONE_SCM_BLOCK_SIZE_DEFAULT = 256 * OzoneConsts.MB;
+
/**
* Ozone administrator users delimited by comma.
* If not set, only the user who launches an ozone service will be the
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneKey.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneKey.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneKey.java
index a99ba0e..b6b62d2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneKey.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneKey.java
@@ -19,6 +19,9 @@
package org.apache.hadoop.ozone;
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ksm.helpers.KsmKeyLocationInfo;
+
+import java.util.List;
/**
* A class that encapsulates OzoneKey.
@@ -38,19 +41,15 @@ public class OzoneKey {
*/
private final String keyName;
/**
- * Name of the Container the Key resides in.
- */
- private final String containerName;
- /**
- * Name of the block id SCM assigned for the key.
- */
- private final String blockID;
- /**
* Size of the data.
*/
private final long dataSize;
/**
+ * All the locations of this key, in an ordered list.
+ */
+ private final List<KsmKeyLocationInfo> keyLocations;
+ /**
* Constructs OzoneKey from KsmKeyInfo.
*
* @param ksmKeyInfo
@@ -59,9 +58,8 @@ public class OzoneKey {
this.volumeName = ksmKeyInfo.getVolumeName();
this.bucketName = ksmKeyInfo.getBucketName();
this.keyName = ksmKeyInfo.getKeyName();
- this.containerName = ksmKeyInfo.getContainerName();
- this.blockID = ksmKeyInfo.getBlockID();
this.dataSize = ksmKeyInfo.getDataSize();
+ this.keyLocations = ksmKeyInfo.getKeyLocationList();
}
/**
@@ -92,29 +90,20 @@ public class OzoneKey {
}
/**
- * Returns Container Name associated with the Key.
- *
- * @return containerName
- */
- public String getContainerName() {
- return containerName;
- }
-
- /**
- * Returns BlockID associated with the Key.
+ * Returns the size of the data.
*
- * @return blockID
+ * @return dataSize
*/
- public String getBlockID() {
- return blockID;
+ public long getDataSize() {
+ return dataSize;
}
/**
- * Returns the size of the data.
+ * Retruns the list of the key locations.
*
- * @return dataSize
+ * @return key locations
*/
- public long getDataSize() {
- return dataSize;
+ public List<KsmKeyLocationInfo> getKeyLocations() {
+ return keyLocations;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/OzoneInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/OzoneInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/OzoneInputStream.java
index 0813868..65d5d1b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/OzoneInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/OzoneInputStream.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.io;
+import org.apache.hadoop.ozone.web.storage.ChunkGroupInputStream;
import org.apache.hadoop.scm.storage.ChunkInputStream;
import java.io.IOException;
@@ -28,14 +29,14 @@ import java.io.InputStream;
*/
public class OzoneInputStream extends InputStream {
- private final ChunkInputStream inputStream;
+ private final ChunkGroupInputStream inputStream;
/**
* Constructs OzoneInputStream with ChunkInputStream.
*
* @param inputStream
*/
- public OzoneInputStream(ChunkInputStream inputStream) {
+ public OzoneInputStream(ChunkGroupInputStream inputStream) {
this.inputStream = inputStream;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/OzoneOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/OzoneOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/OzoneOutputStream.java
index f473292..2421c4a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/OzoneOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/io/OzoneOutputStream.java
@@ -17,25 +17,25 @@
package org.apache.hadoop.ozone.io;
-import org.apache.hadoop.scm.storage.ChunkOutputStream;
+import org.apache.hadoop.ozone.web.storage.ChunkGroupOutputStream;
import java.io.IOException;
import java.io.OutputStream;
/**
* OzoneOutputStream is used to write data into Ozone.
- * It uses SCM's {@link ChunkOutputStream} for writing the data.
+ * It uses SCM's {@link ChunkGroupOutputStream} for writing the data.
*/
public class OzoneOutputStream extends OutputStream {
- private final ChunkOutputStream outputStream;
+ private final ChunkGroupOutputStream outputStream;
/**
- * Constructs OzoneOutputStream with ChunkOutputStream.
+ * Constructs OzoneOutputStream with ChunkGroupOutputStream.
*
* @param outputStream
*/
- public OzoneOutputStream(ChunkOutputStream outputStream) {
+ public OzoneOutputStream(ChunkGroupOutputStream outputStream) {
this.outputStream = outputStream;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
index d3b7e48..48e1049 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.ozone.ksm;
import com.google.common.base.Preconditions;
import org.apache.hadoop.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ksm.helpers.KsmKeyLocationInfo;
+import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
import org.apache.hadoop.ozone.ksm.exceptions.KSMException.ResultCodes;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyInfo;
@@ -31,8 +33,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_KEY;
+
/**
* Implementation of keyManager.
*/
@@ -45,11 +51,14 @@ public class KeyManagerImpl implements KeyManager {
*/
private final ScmBlockLocationProtocol scmBlockClient;
private final MetadataManager metadataManager;
+ private final long scmBlockSize;
public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
- MetadataManager metadataManager) {
+ MetadataManager metadataManager, OzoneConfiguration conf) {
this.scmBlockClient = scmBlockClient;
this.metadataManager = metadataManager;
+ this.scmBlockSize = conf.getLong(OZONE_SCM_BLOCK_SIZE_KEY,
+ OZONE_SCM_BLOCK_SIZE_DEFAULT);
}
@Override
@@ -92,17 +101,37 @@ public class KeyManagerImpl implements KeyManager {
// with a actual SCM block.
// TODO : Review this decision later. We can get away with only a
// metadata entry in case of 0 length key.
- AllocatedBlock allocatedBlock =
- scmBlockClient.allocateBlock(Math.max(args.getDataSize(), 1));
+ long targetSize = args.getDataSize();
+ List<KsmKeyLocationInfo> subKeyInfos = new ArrayList<>();
+ int idx = 0;
+ long offset = 0;
+
+ // in case targetSize == 0, subKeyInfos will be an empty list
+ while (targetSize > 0) {
+ long allocateSize = Math.min(targetSize, scmBlockSize);
+ AllocatedBlock allocatedBlock =
+ scmBlockClient.allocateBlock(allocateSize);
+ KsmKeyLocationInfo subKeyInfo = new KsmKeyLocationInfo.Builder()
+ .setContainerName(allocatedBlock.getPipeline().getContainerName())
+ .setBlockID(allocatedBlock.getKey())
+ .setShouldCreateContainer(allocatedBlock.getCreateContainer())
+ .setIndex(idx)
+ .setLength(allocateSize)
+ .setOffset(offset)
+ .build();
+ idx += 1;
+ offset += allocateSize;
+ targetSize -= allocateSize;
+ subKeyInfos.add(subKeyInfo);
+ }
+
long currentTime = Time.now();
KsmKeyInfo keyBlock = new KsmKeyInfo.Builder()
.setVolumeName(args.getVolumeName())
.setBucketName(args.getBucketName())
.setKeyName(args.getKeyName())
.setDataSize(args.getDataSize())
- .setBlockID(allocatedBlock.getKey())
- .setContainerName(allocatedBlock.getPipeline().getContainerName())
- .setShouldCreateContainer(allocatedBlock.getCreateContainer())
+ .setKsmKeyLocationInfos(subKeyInfos)
.setCreationTime(currentTime)
.setModificationTime(currentTime)
.build();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
index 435b243..94c0975 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
@@ -98,7 +98,8 @@ public class KeySpaceManager implements KeySpaceManagerProtocol {
volumeManager = new VolumeManagerImpl(metadataManager, conf);
bucketManager = new BucketManagerImpl(metadataManager);
metrics = KSMMetrics.create();
- keyManager = new KeyManagerImpl(getScmBlockClient(conf), metadataManager);
+ keyManager = new KeyManagerImpl(
+ getScmBlockClient(conf), metadataManager, conf);
httpServer = new KeySpaceManagerHttpServer(conf);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
index 30a2d22..9e4053e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/cli/SQLCLI.java
@@ -400,11 +400,13 @@ public class SQLCLI extends Configured implements Tool {
break;
case KEY:
KeyInfo keyInfo = KeyInfo.parseFrom(value);
+ // TODO : the two fields container name and block id are no longer used,
+ // need to revisit this later.
String insertKeyInfo =
String.format(INSERT_KEY_INFO, keyInfo.getVolumeName(),
keyInfo.getBucketName(), keyInfo.getKeyName(),
- keyInfo.getDataSize(), keyInfo.getBlockKey(),
- keyInfo.getContainerName());
+ keyInfo.getDataSize(), "EMPTY",
+ "EMPTY");
executeSQL(conn, insertKeyInfo);
break;
default:
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/KeyHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/KeyHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/KeyHandler.java
index 0655a0f..66aeb62 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/KeyHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/handlers/KeyHandler.java
@@ -166,6 +166,7 @@ public class KeyHandler implements Keys {
String contentLenString = getContentLength(headers, args);
String newLen = contentLenString.replaceAll("\"", "");
int contentLen = Integer.parseInt(newLen);
+ args.setSize(contentLen);
MessageDigest md5 = MessageDigest.getInstance("MD5");
int bytesRead = 0;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkGroupInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkGroupInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkGroupInputStream.java
new file mode 100644
index 0000000..12df012
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkGroupInputStream.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.web.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ksm.helpers.KsmKeyLocationInfo;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.storage.ChunkInputStream;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Maintaining a list of ChunkInputStream. Read based on offset.
+ */
+public class ChunkGroupInputStream extends InputStream {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ChunkGroupInputStream.class);
+
+ private static final int EOF = -1;
+
+ private final ArrayList<ChunkInputStreamEntry> streamEntries;
+ private int currentStreamIndex;
+
+ public ChunkGroupInputStream() {
+ streamEntries = new ArrayList<>();
+ currentStreamIndex = 0;
+ }
+
+ @VisibleForTesting
+ public synchronized int getCurrentStreamIndex() {
+ return currentStreamIndex;
+ }
+
+ @VisibleForTesting
+ public long getRemainingOfIndex(int index) {
+ return streamEntries.get(index).getRemaining();
+ }
+
+ /**
+ * Append another stream to the end of the list.
+ *
+ * @param stream the stream instance.
+ * @param length the max number of bytes that should be written to this
+ * stream.
+ */
+ public synchronized void addStream(InputStream stream, long length) {
+ streamEntries.add(new ChunkInputStreamEntry(stream, length));
+ }
+
+
+ @Override
+ public synchronized int read() throws IOException {
+ if (streamEntries.size() <= currentStreamIndex) {
+ throw new IndexOutOfBoundsException();
+ }
+ ChunkInputStreamEntry entry = streamEntries.get(currentStreamIndex);
+ int data = entry.read();
+ return data;
+ }
+
+ @Override
+ public synchronized int read(byte[] b, int off, int len) throws IOException {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+ if (len == 0) {
+ return 0;
+ }
+ int totalReadLen = 0;
+ while (len > 0) {
+ if (streamEntries.size() <= currentStreamIndex) {
+ return totalReadLen == 0 ? EOF : totalReadLen;
+ }
+ ChunkInputStreamEntry current = streamEntries.get(currentStreamIndex);
+ int readLen = Math.min(len, (int)current.getRemaining());
+ int actualLen = current.read(b, off, readLen);
+ // this means the underlying stream has nothing at all, return
+ if (actualLen == EOF) {
+ return totalReadLen > 0? totalReadLen : EOF;
+ }
+ totalReadLen += actualLen;
+ // this means there is no more data to read beyond this point, return
+ if (actualLen != readLen) {
+ return totalReadLen;
+ }
+ off += readLen;
+ len -= readLen;
+ if (current.getRemaining() <= 0) {
+ currentStreamIndex += 1;
+ }
+ }
+ return totalReadLen;
+ }
+
+ private static class ChunkInputStreamEntry extends InputStream {
+
+ private final InputStream inputStream;
+ private final long length;
+ private long currentPosition;
+
+
+ ChunkInputStreamEntry(InputStream chunkInputStream, long length) {
+ this.inputStream = chunkInputStream;
+ this.length = length;
+ this.currentPosition = 0;
+ }
+
+ synchronized long getRemaining() {
+ return length - currentPosition;
+ }
+
+ @Override
+ public synchronized int read(byte[] b, int off, int len)
+ throws IOException {
+ int readLen = inputStream.read(b, off, len);
+ currentPosition += readLen;
+ return readLen;
+ }
+
+ @Override
+ public synchronized int read() throws IOException {
+ int data = inputStream.read();
+ currentPosition += 1;
+ return data;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ inputStream.close();
+ }
+ }
+
+ public static LengthInputStream getFromKsmKeyInfo(KsmKeyInfo keyInfo,
+ XceiverClientManager xceiverClientManager,
+ StorageContainerLocationProtocolClientSideTranslatorPB
+ storageContainerLocationClient, String requestId)
+ throws IOException {
+ int index = 0;
+ long length = 0;
+ String containerKey;
+ ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream();
+ for (KsmKeyLocationInfo ksmKeyLocationInfo : keyInfo.getKeyLocationList()) {
+ // check index as sanity check
+ Preconditions.checkArgument(index++ == ksmKeyLocationInfo.getIndex());
+ String containerName = ksmKeyLocationInfo.getContainerName();
+ Pipeline pipeline =
+ storageContainerLocationClient.getContainer(containerName);
+ XceiverClientSpi xceiverClient =
+ xceiverClientManager.acquireClient(pipeline);
+ boolean success = false;
+ containerKey = ksmKeyLocationInfo.getBlockID();
+ try {
+ LOG.debug("get key accessing {} {}",
+ xceiverClient.getPipeline().getContainerName(), containerKey);
+ ContainerProtos.KeyData containerKeyData = OzoneContainerTranslation
+ .containerKeyDataForRead(
+ xceiverClient.getPipeline().getContainerName(), containerKey);
+ ContainerProtos.GetKeyResponseProto response = ContainerProtocolCalls
+ .getKey(xceiverClient, containerKeyData, requestId);
+ List<ContainerProtos.ChunkInfo> chunks =
+ response.getKeyData().getChunksList();
+ for (ContainerProtos.ChunkInfo chunk : chunks) {
+ length += chunk.getLen();
+ }
+ success = true;
+ ChunkInputStream inputStream = new ChunkInputStream(
+ containerKey, xceiverClientManager, xceiverClient,
+ chunks, requestId);
+ groupInputStream.addStream(inputStream,
+ ksmKeyLocationInfo.getLength());
+ } finally {
+ if (!success) {
+ xceiverClientManager.releaseClient(xceiverClient);
+ }
+ }
+ }
+ return new LengthInputStream(groupInputStream, length);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkGroupOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkGroupOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkGroupOutputStream.java
new file mode 100644
index 0000000..f479f0a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/ChunkGroupOutputStream.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.web.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ksm.helpers.KsmKeyLocationInfo;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.scm.storage.ChunkOutputStream;
+import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+
+/**
+ * Maintaining a list of ChunkInputStream. Write based on offset.
+ *
+ * Note that this may write to multiple containers in one write call. In case
+ * that first container succeeded but later ones failed, the succeeded writes
+ * are not rolled back.
+ *
+ * TODO : currently not support multi-thread access.
+ */
+public class ChunkGroupOutputStream extends OutputStream {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ChunkGroupOutputStream.class);
+
+ // array list's get(index) is O(1)
+ private final ArrayList<ChunkOutputStreamEntry> streamEntries;
+ private int currentStreamIndex;
+ private long totalSize;
+ private long byteOffset;
+
+ public ChunkGroupOutputStream() {
+ this.streamEntries = new ArrayList<>();
+ this.currentStreamIndex = 0;
+ this.totalSize = 0;
+ this.byteOffset = 0;
+ }
+
+ @VisibleForTesting
+ public long getByteOffset() {
+ return byteOffset;
+ }
+
+ /**
+ * Append another stream to the end of the list. Note that the streams are not
+ * actually created to this point, only enough meta data about the stream is
+ * stored. When something is to be actually written to the stream, the stream
+ * will be created (if not already).
+ *
+ * @param containerKey the key to store in the container
+ * @param key the ozone key
+ * @param xceiverClientManager xceiver manager instance
+ * @param xceiverClient xceiver manager instance
+ * @param requestID the request id
+ * @param chunkSize the chunk size for this key chunks
+ * @param length the total length of this key
+ */
+ public synchronized void addStream(String containerKey, String key,
+ XceiverClientManager xceiverClientManager, XceiverClientSpi xceiverClient,
+ String requestID, int chunkSize, long length) {
+ streamEntries.add(new ChunkOutputStreamEntry(containerKey, key,
+ xceiverClientManager, xceiverClient, requestID, chunkSize, length));
+ totalSize += length;
+ }
+
+ @VisibleForTesting
+ public synchronized void addStream(OutputStream outputStream, long length) {
+ streamEntries.add(new ChunkOutputStreamEntry(outputStream, length));
+ totalSize += length;
+ }
+
+ @Override
+ public synchronized void write(int b) throws IOException {
+ if (streamEntries.size() <= currentStreamIndex) {
+ throw new IndexOutOfBoundsException();
+ }
+ ChunkOutputStreamEntry entry = streamEntries.get(currentStreamIndex);
+ entry.write(b);
+ if (entry.getRemaining() <= 0) {
+ currentStreamIndex += 1;
+ }
+ byteOffset += 1;
+ }
+
+ /**
+ * Try to write the bytes sequence b[off:off+len) to streams.
+ *
+ * NOTE: Throws exception if the data could not fit into the remaining space.
+ * In which case nothing will be written.
+ * TODO:May need to revisit this behaviour.
+ *
+ * @param b byte data
+ * @param off starting offset
+ * @param len length to write
+ * @throws IOException
+ */
+ @Override
+ public synchronized void write(byte[] b, int off, int len)
+ throws IOException {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if ((off < 0) || (off > b.length) || (len < 0) ||
+ ((off + len) > b.length) || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException();
+ }
+ if (len == 0) {
+ return;
+ }
+ if (streamEntries.size() <= currentStreamIndex) {
+ throw new IOException("Write out of stream range! stream index:" +
+ currentStreamIndex);
+ }
+ if (totalSize - byteOffset < len) {
+ throw new IOException("Can not write " + len + " bytes with only " +
+ (totalSize - byteOffset) + " byte space");
+ }
+ while (len > 0) {
+ // in theory, this condition should never violate due the check above
+ // still do a sanity check.
+ Preconditions.checkArgument(currentStreamIndex < streamEntries.size());
+ ChunkOutputStreamEntry current = streamEntries.get(currentStreamIndex);
+ int writeLen = Math.min(len, (int)current.getRemaining());
+ current.write(b, off, writeLen);
+ if (current.getRemaining() <= 0) {
+ currentStreamIndex += 1;
+ }
+ len -= writeLen;
+ off += writeLen;
+ byteOffset += writeLen;
+ }
+ }
+
+ @Override
+ public synchronized void flush() throws IOException {
+ for (int i = 0; i <= currentStreamIndex; i++) {
+ streamEntries.get(i).flush();
+ }
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ for (ChunkOutputStreamEntry entry : streamEntries) {
+ entry.close();
+ }
+ }
+
+ private static class ChunkOutputStreamEntry extends OutputStream {
+ private OutputStream outputStream;
+ private final String containerKey;
+ private final String key;
+ private final XceiverClientManager xceiverClientManager;
+ private final XceiverClientSpi xceiverClient;
+ private final String requestId;
+ private final int chunkSize;
+ // total number of bytes that should be written to this stream
+ private final long length;
+ // the current position of this stream 0 <= currentPosition < length
+ private long currentPosition;
+
+ ChunkOutputStreamEntry(String containerKey, String key,
+ XceiverClientManager xceiverClientManager,
+ XceiverClientSpi xceiverClient, String requestId, int chunkSize,
+ long length) {
+ this.outputStream = null;
+ this.containerKey = containerKey;
+ this.key = key;
+ this.xceiverClientManager = xceiverClientManager;
+ this.xceiverClient = xceiverClient;
+ this.requestId = requestId;
+ this.chunkSize = chunkSize;
+
+ this.length = length;
+ this.currentPosition = 0;
+ }
+
+ /**
+ * For testing purpose, taking a some random created stream instance.
+ * @param outputStream a existing writable output stream
+ * @param length the length of data to write to the stream
+ */
+ ChunkOutputStreamEntry(OutputStream outputStream, long length) {
+ this.outputStream = outputStream;
+ this.containerKey = null;
+ this.key = null;
+ this.xceiverClientManager = null;
+ this.xceiverClient = null;
+ this.requestId = null;
+ this.chunkSize = -1;
+
+ this.length = length;
+ this.currentPosition = 0;
+ }
+
+ long getLength() {
+ return length;
+ }
+
+ long getRemaining() {
+ return length - currentPosition;
+ }
+
+ private synchronized void checkStream() {
+ if (this.outputStream == null) {
+ this.outputStream = new ChunkOutputStream(containerKey,
+ key, xceiverClientManager, xceiverClient,
+ requestId, chunkSize);
+ }
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ checkStream();
+ outputStream.write(b);
+ this.currentPosition += 1;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ checkStream();
+ outputStream.write(b, off, len);
+ this.currentPosition += len;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (this.outputStream != null) {
+ this.outputStream.flush();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (this.outputStream != null) {
+ this.outputStream.close();
+ }
+ }
+ }
+
+ public static ChunkGroupOutputStream getFromKsmKeyInfo(
+ KsmKeyInfo keyInfo, XceiverClientManager xceiverClientManager,
+ StorageContainerLocationProtocolClientSideTranslatorPB
+ storageContainerLocationClient,
+ int chunkSize, String requestId) throws IOException {
+ // TODO: the following createContainer and key writes may fail, in which
+ // case we should revert the above allocateKey to KSM.
+ // check index as sanity check
+ int index = 0;
+ String containerKey;
+ ChunkGroupOutputStream groupOutputStream = new ChunkGroupOutputStream();
+ for (KsmKeyLocationInfo subKeyInfo : keyInfo.getKeyLocationList()) {
+ containerKey = subKeyInfo.getBlockID();
+
+ Preconditions.checkArgument(index++ == subKeyInfo.getIndex());
+ String containerName = subKeyInfo.getContainerName();
+ Pipeline pipeline =
+ storageContainerLocationClient.getContainer(containerName);
+ XceiverClientSpi xceiverClient =
+ xceiverClientManager.acquireClient(pipeline);
+ // create container if needed
+ // TODO : should be subKeyInfo.getShouldCreateContainer(), but for now
+ // always true.
+ boolean shouldCreate = true;
+ if (shouldCreate) {
+ try {
+ ContainerProtocolCalls.createContainer(xceiverClient, requestId);
+ } catch (StorageContainerException sce) {
+ LOG.warn("Create container failed with {}", containerName, sce);
+ }
+ }
+
+ groupOutputStream.addStream(containerKey, keyInfo.getKeyName(),
+ xceiverClientManager, xceiverClient, requestId, chunkSize,
+ subKeyInfo.getLength());
+ }
+ return groupOutputStream;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
index 1bf5b11..51affd3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/web/storage/DistributedStorageHandler.java
@@ -19,12 +19,6 @@
package org.apache.hadoop.ozone.web.storage;
import com.google.common.base.Strings;
-import org.apache.hadoop.hdfs.ozone.protocol.proto
- .ContainerProtos.ChunkInfo;
-import org.apache.hadoop.hdfs.ozone.protocol.proto
- .ContainerProtos.GetKeyResponseProto;
-import org.apache.hadoop.hdfs.ozone.protocol.proto
- .ContainerProtos.KeyData;
import org.apache.hadoop.hdfs.server.datanode.fsdataset
.LengthInputStream;
import org.apache.hadoop.ksm.helpers.KsmBucketArgs;
@@ -37,12 +31,12 @@ import org.apache.hadoop.ksm.protocolPB
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.OzoneConsts.Versioning;
+import org.apache.hadoop.ozone.io.OzoneOutputStream;
import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos;
import org.apache.hadoop.ozone.protocolPB.KSMPBHelper;
import org.apache.hadoop.ozone.ksm.KSMConfigKeys;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.web.request.OzoneQuota;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.protocolPB
@@ -61,21 +55,12 @@ import org.apache.hadoop.ozone.web.response.ListBuckets;
import org.apache.hadoop.ozone.web.response.BucketInfo;
import org.apache.hadoop.ozone.web.response.KeyInfo;
import org.apache.hadoop.ozone.web.response.ListKeys;
-import org.apache.hadoop.scm.XceiverClientSpi;
-import org.apache.hadoop.scm.storage.ChunkInputStream;
-import org.apache.hadoop.scm.storage.ChunkOutputStream;
-import org.apache.hadoop.scm.storage.ContainerProtocolCalls;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.TimeZone;
-import java.util.Locale;
import java.util.List;
/**
@@ -104,9 +89,9 @@ public final class DistributedStorageHandler implements StorageHandler {
*/
public DistributedStorageHandler(OzoneConfiguration conf,
StorageContainerLocationProtocolClientSideTranslatorPB
- storageContainerLocation,
+ storageContainerLocation,
KeySpaceManagerProtocolClientSideTranslatorPB
- keySpaceManagerClient) {
+ keySpaceManagerClient) {
this.keySpaceManagerClient = keySpaceManagerClient;
this.storageContainerLocationClient = storageContainerLocation;
this.xceiverClientManager = new XceiverClientManager(conf);
@@ -119,8 +104,8 @@ public final class DistributedStorageHandler implements StorageHandler {
KSMConfigKeys.OZONE_KSM_GROUP_RIGHTS_DEFAULT);
if(chunkSize > ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE) {
LOG.warn("The chunk size ({}) is not allowed to be more than"
- + " the maximum size ({}),"
- + " resetting to the maximum size.",
+ + " the maximum size ({}),"
+ + " resetting to the maximum size.",
chunkSize, ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE);
chunkSize = ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE;
}
@@ -159,7 +144,7 @@ public final class DistributedStorageHandler implements StorageHandler {
public void setVolumeQuota(VolumeArgs args, boolean remove)
throws IOException, OzoneException {
long quota = remove ? OzoneConsts.MAX_QUOTA_IN_BYTES :
- args.getQuota().sizeInBytes();
+ args.getQuota().sizeInBytes();
keySpaceManagerClient.setQuota(args.getVolumeName(), quota);
}
@@ -397,22 +382,11 @@ public final class DistributedStorageHandler implements StorageHandler {
.setDataSize(args.getSize())
.build();
// contact KSM to allocate a block for key.
- String containerKey = buildContainerKey(args.getVolumeName(),
- args.getBucketName(), args.getKeyName());
KsmKeyInfo keyInfo = keySpaceManagerClient.allocateKey(keyArgs);
- // TODO the following createContainer and key writes may fail, in which
- // case we should revert the above allocateKey to KSM.
- String containerName = keyInfo.getContainerName();
- XceiverClientSpi xceiverClient = getContainer(containerName);
- if (keyInfo.getShouldCreateContainer()) {
- LOG.debug("Need to create container {} for key: {}/{}/{}", containerName,
- args.getVolumeName(), args.getBucketName(), args.getKeyName());
- ContainerProtocolCalls.createContainer(
- xceiverClient, args.getRequestID());
- }
- // establish a connection to the container to write the key
- return new ChunkOutputStream(containerKey, args.getKeyName(),
- xceiverClientManager, xceiverClient, args.getRequestID(), chunkSize);
+ ChunkGroupOutputStream groupOutputStream =
+ ChunkGroupOutputStream.getFromKsmKeyInfo(keyInfo, xceiverClientManager,
+ storageContainerLocationClient, chunkSize, args.getRequestID());
+ return new OzoneOutputStream(groupOutputStream);
}
@Override
@@ -431,33 +405,9 @@ public final class DistributedStorageHandler implements StorageHandler {
.setDataSize(args.getSize())
.build();
KsmKeyInfo keyInfo = keySpaceManagerClient.lookupKey(keyArgs);
- String containerKey = buildContainerKey(args.getVolumeName(),
- args.getBucketName(), args.getKeyName());
- String containerName = keyInfo.getContainerName();
- XceiverClientSpi xceiverClient = getContainer(containerName);
- boolean success = false;
- try {
- LOG.debug("get key accessing {} {}",
- xceiverClient.getPipeline().getContainerName(), containerKey);
- KeyData containerKeyData = OzoneContainerTranslation
- .containerKeyDataForRead(
- xceiverClient.getPipeline().getContainerName(), containerKey);
- GetKeyResponseProto response = ContainerProtocolCalls
- .getKey(xceiverClient, containerKeyData, args.getRequestID());
- long length = 0;
- List<ChunkInfo> chunks = response.getKeyData().getChunksList();
- for (ChunkInfo chunk : chunks) {
- length += chunk.getLen();
- }
- success = true;
- return new LengthInputStream(new ChunkInputStream(
- containerKey, xceiverClientManager, xceiverClient,
- chunks, args.getRequestID()), length);
- } finally {
- if (!success) {
- xceiverClientManager.releaseClient(xceiverClient);
- }
- }
+ return ChunkGroupInputStream.getFromKsmKeyInfo(
+ keyInfo, xceiverClientManager, storageContainerLocationClient,
+ args.getRequestID());
}
@Override
@@ -535,37 +485,6 @@ public final class DistributedStorageHandler implements StorageHandler {
}
}
- private XceiverClientSpi getContainer(String containerName)
- throws IOException {
- Pipeline pipeline =
- storageContainerLocationClient.getContainer(containerName);
- return xceiverClientManager.acquireClient(pipeline);
- }
-
- /**
- * Creates a container key from any number of components by combining all
- * components with a delimiter.
- *
- * @param parts container key components
- * @return container key
- */
- private static String buildContainerKey(String... parts) {
- return '/' + StringUtils.join('/', parts);
- }
-
- /**
- * Formats a date in the expected string format.
- *
- * @param date the date to format
- * @return formatted string representation of date
- */
- private static String dateToString(Date date) {
- SimpleDateFormat sdf =
- new SimpleDateFormat(OzoneConsts.OZONE_DATE_FORMAT, Locale.US);
- sdf.setTimeZone(TimeZone.getTimeZone(OzoneConsts.OZONE_TIME_ZONE));
- return sdf.format(date);
- }
-
/**
* Closes DistributedStorageHandler.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
index cf14013..be9a48a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
@@ -813,4 +813,12 @@
Port used by cblock to connect to SCM.
</description>
</property>
+
+ <property>
+ <name>ozone.scm.block.size</name>
+ <value>268435456</value>
+ <description>
+ The default size of a scm block in bytes.
+ </description>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestChunkStreams.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestChunkStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestChunkStreams.java
new file mode 100644
index 0000000..2c936df
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestChunkStreams.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.ozone.web.storage.ChunkGroupInputStream;
+import org.apache.hadoop.ozone.web.storage.ChunkGroupOutputStream;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This class tests ChunkGroupInputStream and ChunkGroupOutStream.
+ */
+public class TestChunkStreams {
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ /**
+ * This test uses ByteArrayOutputStream as the underlying stream to test
+ * the correctness of ChunkGroupOutputStream.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testWriteGroupOutputStream() throws Exception {
+ try (ChunkGroupOutputStream groupOutputStream =
+ new ChunkGroupOutputStream()) {
+ ArrayList<OutputStream> outputStreams = new ArrayList<>();
+
+ // 5 byte streams, each 100 bytes. write 500 bytes means writing to each
+ // of them with 100 bytes.
+ for (int i = 0; i < 5; i++) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream(100);
+ outputStreams.add(out);
+ groupOutputStream.addStream(out, 100);
+ }
+ assertEquals(0, groupOutputStream.getByteOffset());
+
+ String dataString = RandomStringUtils.randomAscii(500);
+ byte[] data = dataString.getBytes();
+ groupOutputStream.write(data, 0, data.length);
+ assertEquals(500, groupOutputStream.getByteOffset());
+
+ String res = "";
+ int offset = 0;
+ for (OutputStream stream : outputStreams) {
+ String subString = stream.toString();
+ res += subString;
+ assertEquals(dataString.substring(offset, offset + 100), subString);
+ offset += 100;
+ }
+ assertEquals(dataString, res);
+ }
+ }
+
+ @Test
+ public void testErrorWriteGroupOutputStream() throws Exception {
+ try (ChunkGroupOutputStream groupOutputStream =
+ new ChunkGroupOutputStream()) {
+ ArrayList<OutputStream> outputStreams = new ArrayList<>();
+
+ // 5 byte streams, each 100 bytes. write 500 bytes means writing to each
+ // of them with 100 bytes. all 5 streams makes up a ChunkGroupOutputStream
+ // with a total of 500 bytes in size
+ for (int i = 0; i < 5; i++) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream(100);
+ outputStreams.add(out);
+ groupOutputStream.addStream(out, 100);
+ }
+ assertEquals(0, groupOutputStream.getByteOffset());
+
+ // first writes of 100 bytes should succeed
+ groupOutputStream.write(RandomStringUtils.randomAscii(100).getBytes());
+ assertEquals(100, groupOutputStream.getByteOffset());
+
+ // second writes of 500 bytes should fail, as there should be only 400
+ // bytes space left
+ // TODO : if we decide to take the 400 bytes instead in the future,
+ // other add more informative error code rather than exception, need to
+ // change this part.
+ exception.expect(IOException.class);
+ exception.expectMessage(
+ "Can not write 500 bytes with only 400 byte space");
+ groupOutputStream.write(RandomStringUtils.randomAscii(500).getBytes());
+ assertEquals(100, groupOutputStream.getByteOffset());
+ }
+ }
+
+ @Test
+ public void testReadGroupInputStream() throws Exception {
+ try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) {
+ ArrayList<InputStream> inputStreams = new ArrayList<>();
+
+ String dataString = RandomStringUtils.randomAscii(500);
+ byte[] buf = dataString.getBytes();
+ int offset = 0;
+ for (int i = 0; i < 5; i++) {
+ ByteArrayInputStream in = new ByteArrayInputStream(buf, offset, 100);
+ inputStreams.add(in);
+ offset += 100;
+ groupInputStream.addStream(in, 100);
+ }
+
+ byte[] resBuf = new byte[500];
+ int len = groupInputStream.read(resBuf, 0, 500);
+
+ assertEquals(500, len);
+ assertEquals(dataString, new String(resBuf));
+ }
+ }
+
+ @Test
+ public void testErrorReadGroupInputStream() throws Exception {
+ try (ChunkGroupInputStream groupInputStream = new ChunkGroupInputStream()) {
+ ArrayList<InputStream> inputStreams = new ArrayList<>();
+
+ String dataString = RandomStringUtils.randomAscii(500);
+ byte[] buf = dataString.getBytes();
+ int offset = 0;
+ for (int i = 0; i < 5; i++) {
+ ByteArrayInputStream in = new ByteArrayInputStream(buf, offset, 100);
+ inputStreams.add(in);
+ offset += 100;
+ groupInputStream.addStream(in, 100);
+ }
+
+ byte[] resBuf = new byte[600];
+ // read 300 bytes first
+ int len = groupInputStream.read(resBuf, 0, 340);
+ assertEquals(3, groupInputStream.getCurrentStreamIndex());
+ assertEquals(60, groupInputStream.getRemainingOfIndex(3));
+ assertEquals(340, len);
+ assertEquals(dataString.substring(0, 340),
+ new String(resBuf).substring(0, 340));
+
+ // read following 300 bytes, but only 200 left
+ len = groupInputStream.read(resBuf, 340, 260);
+ assertEquals(5, groupInputStream.getCurrentStreamIndex());
+ assertEquals(0, groupInputStream.getRemainingOfIndex(4));
+ assertEquals(160, len);
+ assertEquals(dataString, new String(resBuf).substring(0, 500));
+
+ // further read should get EOF
+ len = groupInputStream.read(resBuf, 0, 1);
+ // reached EOF, further read should get -1
+ assertEquals(-1, len);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/42ab44d3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestMultipleContainerReadWrite.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestMultipleContainerReadWrite.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestMultipleContainerReadWrite.java
new file mode 100644
index 0000000..c310055
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/ksm/TestMultipleContainerReadWrite.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.web.handlers.BucketArgs;
+import org.apache.hadoop.ozone.web.handlers.KeyArgs;
+import org.apache.hadoop.ozone.web.handlers.UserArgs;
+import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
+import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.scm.ScmConfigKeys;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.LinkedList;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test key write/read where a key can span multiple containers.
+ */
+public class TestMultipleContainerReadWrite {
+ private static MiniOzoneCluster cluster = null;
+ private static StorageHandler storageHandler;
+ private static UserArgs userArgs;
+ private static OzoneConfiguration conf;
+
+ @Rule
+ public ExpectedException exception = ExpectedException.none();
+
+ /**
+ * Create a MiniDFSCluster for testing.
+ * <p>
+ * Ozone is made active by setting OZONE_ENABLED = true and
+ * OZONE_HANDLER_TYPE_KEY = "distributed"
+ *
+ * @throws IOException
+ */
+ @BeforeClass
+ public static void init() throws Exception {
+ conf = new OzoneConfiguration();
+ // set to as small as 100 bytes per block.
+ conf.setLong(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_KEY, 100);
+ conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 5);
+ conf.set(OzoneConfigKeys.OZONE_HANDLER_TYPE_KEY,
+ OzoneConsts.OZONE_HANDLER_DISTRIBUTED);
+ cluster = new MiniOzoneCluster.Builder(conf)
+ .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+ storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
+ userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
+ null, null, null, null);
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @AfterClass
+ public static void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testWriteRead() throws Exception {
+ String userName = "user" + RandomStringUtils.randomNumeric(5);
+ String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+ String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+ String keyName = "key" + RandomStringUtils.randomNumeric(5);
+
+ VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+ createVolumeArgs.setUserName(userName);
+ createVolumeArgs.setAdminName(adminName);
+ storageHandler.createVolume(createVolumeArgs);
+
+ BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+ bucketArgs.setAddAcls(new LinkedList<>());
+ bucketArgs.setRemoveAcls(new LinkedList<>());
+ bucketArgs.setStorageType(StorageType.DISK);
+ storageHandler.createBucket(bucketArgs);
+
+ String dataString = RandomStringUtils.randomAscii(500);
+ KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
+ keyArgs.setSize(500);
+
+ try (OutputStream outputStream = storageHandler.newKeyWriter(keyArgs)) {
+ outputStream.write(dataString.getBytes());
+ }
+
+ byte[] data = new byte[dataString.length()];
+ try (InputStream inputStream = storageHandler.newKeyReader(keyArgs)) {
+ inputStream.read(data, 0, data.length);
+ }
+ assertEquals(dataString, new String(data));
+ // checking whether container meta data has the chunk file persisted.
+ MetricsRecordBuilder containerMetrics = getMetrics(
+ "StorageContainerMetrics");
+ assertCounter("numWriteChunk", 5L, containerMetrics);
+ assertCounter("numReadChunk", 5L, containerMetrics);
+ }
+
+ @Test
+ public void testErrorWrite() throws Exception {
+ String userName = "user" + RandomStringUtils.randomNumeric(5);
+ String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+ String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+ String keyName = "key" + RandomStringUtils.randomNumeric(5);
+
+ VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+ createVolumeArgs.setUserName(userName);
+ createVolumeArgs.setAdminName(adminName);
+ storageHandler.createVolume(createVolumeArgs);
+
+ BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+ bucketArgs.setAddAcls(new LinkedList<>());
+ bucketArgs.setRemoveAcls(new LinkedList<>());
+ bucketArgs.setStorageType(StorageType.DISK);
+ storageHandler.createBucket(bucketArgs);
+
+ String dataString1 = RandomStringUtils.randomAscii(100);
+ String dataString2 = RandomStringUtils.randomAscii(500);
+ KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
+ keyArgs.setSize(500);
+
+ try (OutputStream outputStream = storageHandler.newKeyWriter(keyArgs)) {
+ // first write will write succeed
+ outputStream.write(dataString1.getBytes());
+ // second write
+ exception.expect(IOException.class);
+ exception.expectMessage(
+ "Can not write 500 bytes with only 400 byte space");
+ outputStream.write(dataString2.getBytes());
+ }
+ }
+
+ @Test
+ public void testPartialRead() throws Exception {
+ String userName = "user" + RandomStringUtils.randomNumeric(5);
+ String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+ String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+ String bucketName = "bucket" + RandomStringUtils.randomNumeric(5);
+ String keyName = "key" + RandomStringUtils.randomNumeric(5);
+
+ VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+ createVolumeArgs.setUserName(userName);
+ createVolumeArgs.setAdminName(adminName);
+ storageHandler.createVolume(createVolumeArgs);
+
+ BucketArgs bucketArgs = new BucketArgs(bucketName, createVolumeArgs);
+ bucketArgs.setAddAcls(new LinkedList<>());
+ bucketArgs.setRemoveAcls(new LinkedList<>());
+ bucketArgs.setStorageType(StorageType.DISK);
+ storageHandler.createBucket(bucketArgs);
+
+ String dataString = RandomStringUtils.randomAscii(500);
+ KeyArgs keyArgs = new KeyArgs(volumeName, bucketName, keyName, userArgs);
+ keyArgs.setSize(500);
+
+ try (OutputStream outputStream = storageHandler.newKeyWriter(keyArgs)) {
+ outputStream.write(dataString.getBytes());
+ }
+
+ byte[] data = new byte[600];
+ try (InputStream inputStream = storageHandler.newKeyReader(keyArgs)) {
+ int readLen = inputStream.read(data, 0, 340);
+ assertEquals(340, readLen);
+ assertEquals(dataString.substring(0, 340),
+ new String(data).substring(0, 340));
+
+ readLen = inputStream.read(data, 340, 260);
+ assertEquals(160, readLen);
+ assertEquals(dataString, new String(data).substring(0, 500));
+
+ readLen = inputStream.read(data, 500, 1);
+ assertEquals(-1, readLen);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org