You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sz...@apache.org on 2022/03/15 06:56:29 UTC
[ozone] 04/30: HDDS-5480. [Ozone-Streaming] Client and server should support stream setup. (#2452)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit decaa57a05927da2df7486cce43fdea00d1f5cd9
Author: micah zhao <mi...@tencent.com>
AuthorDate: Wed Jul 28 20:22:53 2021 +0800
HDDS-5480. [Ozone-Streaming] Client and server should support stream setup. (#2452)
---
.../hadoop/hdds/protocol/DatanodeDetails.java | 9 +++--
.../org/apache/hadoop/hdds/ratis/RatisHelper.java | 16 ++++++--
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 18 +++++++++
.../org/apache/hadoop/ozone/audit/DNAction.java | 3 +-
.../helpers/ContainerCommandRequestPBHelper.java | 1 +
.../common/src/main/resources/ozone-default.xml | 20 ++++++++++
.../org/apache/hadoop/hdds/conf/ConfigTag.java | 3 +-
.../container/common/impl/HddsDispatcher.java | 3 +-
.../transport/server/ratis/XceiverServerRatis.java | 46 +++++++++++++++++++++-
.../ozone/container/keyvalue/KeyValueHandler.java | 33 ++++++++++++++++
.../keyvalue/impl/ChunkManagerDispatcher.java | 6 +++
.../keyvalue/impl/FilePerBlockStrategy.java | 8 ++++
.../keyvalue/interfaces/ChunkManager.java | 5 +++
.../container/common/TestDatanodeStateMachine.java | 6 ++-
.../TestCreatePipelineCommandHandler.java | 3 ++
.../hdds/conf/DatanodeRatisServerConfig.java | 35 ++++++++++++++++
.../src/main/proto/DatanodeClientProtocol.proto | 4 +-
.../ozone/container/common/TestEndPoint.java | 4 ++
.../intellij/runConfigurations/Datanode2.xml | 2 +-
.../intellij/runConfigurations/Datanode3.xml | 2 +-
.../org/apache/hadoop/ozone/MiniOzoneCluster.java | 1 +
.../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 3 ++
.../apache/hadoop/ozone/TestMiniOzoneCluster.java | 2 +
.../server/TestSecureContainerServer.java | 2 +
24 files changed, 220 insertions(+), 15 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index 25826f3..78a0eeb 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -273,8 +273,10 @@ public class DatanodeDetails extends NodeImpl implements
return port;
}
}
- // if no separate admin/server port, return single Ratis one for compat
- if (name == Name.RATIS_ADMIN || name == Name.RATIS_SERVER) {
+ // if no separate admin/server/datastream port, return single Ratis one for
+ // compat
+ if (name == Name.RATIS_ADMIN || name == Name.RATIS_SERVER ||
+ name == Name.RATIS_DATASTREAM) {
return getPort(Name.RATIS);
}
return null;
@@ -784,7 +786,8 @@ public class DatanodeDetails extends NodeImpl implements
* Ports that are supported in DataNode.
*/
public enum Name {
- STANDALONE, RATIS, REST, REPLICATION, RATIS_ADMIN, RATIS_SERVER;
+ STANDALONE, RATIS, REST, REPLICATION, RATIS_ADMIN, RATIS_SERVER,
+ RATIS_DATASTREAM;
public static final Set<Name> ALL_PORTS = ImmutableSet.copyOf(
Name.values());
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
index c1cd865..5e75118 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
@@ -43,6 +43,7 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.SupportedDataStreamType;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.proto.RaftProtos;
@@ -118,7 +119,9 @@ public final class RatisHelper {
.setId(toRaftPeerId(dn))
.setAddress(toRaftPeerAddress(dn, Port.Name.RATIS_SERVER))
.setAdminAddress(toRaftPeerAddress(dn, Port.Name.RATIS_ADMIN))
- .setClientAddress(toRaftPeerAddress(dn, Port.Name.RATIS));
+ .setClientAddress(toRaftPeerAddress(dn, Port.Name.RATIS))
+ .setDataStreamAddress(
+ toRaftPeerAddress(dn, Port.Name.RATIS_DATASTREAM));
}
private static List<RaftPeer> toRaftPeers(Pipeline pipeline) {
@@ -172,6 +175,7 @@ public final class RatisHelper {
ConfigurationSource ozoneConfiguration) throws IOException {
return newRaftClient(rpcType,
toRaftPeerId(pipeline.getLeaderNode()),
+ toRaftPeer(pipeline.getFirstNode()),
newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()),
pipeline.getNodes()), retryPolicy, tlsConfig, ozoneConfiguration);
}
@@ -191,7 +195,7 @@ public final class RatisHelper {
public static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig,
ConfigurationSource configuration) {
- return newRaftClient(rpcType, leader.getId(),
+ return newRaftClient(rpcType, leader.getId(), leader,
newRaftGroup(Collections.singletonList(leader)), retryPolicy,
tlsConfig, configuration);
}
@@ -199,14 +203,14 @@ public final class RatisHelper {
public static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
RetryPolicy retryPolicy,
ConfigurationSource ozoneConfiguration) {
- return newRaftClient(rpcType, leader.getId(),
+ return newRaftClient(rpcType, leader.getId(), leader,
newRaftGroup(Collections.singletonList(leader)), retryPolicy, null,
ozoneConfiguration);
}
@SuppressWarnings("checkstyle:ParameterNumber")
private static RaftClient newRaftClient(RpcType rpcType, RaftPeerId leader,
- RaftGroup group, RetryPolicy retryPolicy,
+ RaftPeer primary, RaftGroup group, RetryPolicy retryPolicy,
GrpcTlsConfig tlsConfig, ConfigurationSource ozoneConfiguration) {
if (LOG.isTraceEnabled()) {
LOG.trace("newRaftClient: {}, leader={}, group={}",
@@ -214,6 +218,9 @@ public final class RatisHelper {
}
final RaftProperties properties = new RaftProperties();
+ RaftConfigKeys.DataStream.setType(properties,
+ SupportedDataStreamType.NETTY);
+
RaftConfigKeys.Rpc.setType(properties, rpcType);
// Set the ratis client headers which are matching with regex.
@@ -223,6 +230,7 @@ public final class RatisHelper {
.setRaftGroup(group)
.setLeaderId(leader)
.setProperties(properties)
+ .setPrimaryDataStreamServer(primary)
.setRetryPolicy(retryPolicy);
// TODO: GRPC TLS only for now, netty/hadoop RPC TLS support later.
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 627c432..07182c8 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -57,6 +57,12 @@ public final class OzoneConfigKeys {
public static final boolean DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT =
false;
+ public static final String DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT =
+ "dfs.container.ratis.datastream.random.port";
+ public static final boolean
+ DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT_DEFAULT =
+ false;
+
public static final String DFS_CONTAINER_CHUNK_WRITE_SYNC_KEY =
"dfs.container.chunk.write.sync";
public static final boolean DFS_CONTAINER_CHUNK_WRITE_SYNC_DEFAULT = false;
@@ -80,6 +86,18 @@ public final class OzoneConfigKeys {
public static final int DFS_CONTAINER_RATIS_SERVER_PORT_DEFAULT = 9856;
/**
+ * Ratis Port where containers listen to datastream requests.
+ */
+ public static final String DFS_CONTAINER_RATIS_DATASTREAM_ENABLE
+ = "dfs.container.ratis.datastream.enable";
+ public static final boolean DFS_CONTAINER_RATIS_DATASTREAM_ENABLE_DEFAULT
+ = true;
+ public static final String DFS_CONTAINER_RATIS_DATASTREAM_PORT
+ = "dfs.container.ratis.datastream.port";
+ public static final int DFS_CONTAINER_RATIS_DATASTREAM_PORT_DEFAULT
+ = 9855;
+
+ /**
* When set to true, allocate a random free port for ozone container, so that
* a mini cluster is able to launch multiple containers on a node.
*/
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
index 1c87f2b..73aff9a 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
@@ -38,7 +38,8 @@ public enum DNAction implements AuditAction {
PUT_SMALL_FILE,
GET_SMALL_FILE,
CLOSE_CONTAINER,
- GET_COMMITTED_BLOCK_LENGTH;
+ GET_COMMITTED_BLOCK_LENGTH,
+ STREAM_INIT;
@Override
public String getAction() {
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerCommandRequestPBHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerCommandRequestPBHelper.java
index a13f164..4d7f0f3 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerCommandRequestPBHelper.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerCommandRequestPBHelper.java
@@ -187,6 +187,7 @@ public final class ContainerCommandRequestPBHelper {
case GetSmallFile : return DNAction.GET_SMALL_FILE;
case CloseContainer : return DNAction.CLOSE_CONTAINER;
case GetCommittedBlockLength : return DNAction.GET_COMMITTED_BLOCK_LENGTH;
+ case StreamInit : return DNAction.STREAM_INIT;
default :
LOG.debug("Invalid command type - {}", cmdType);
return null;
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 24f0c45..9fe9e04 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -54,6 +54,26 @@
<description>The ipc port number of container.</description>
</property>
<property>
+ <name>dfs.container.ratis.datastream.enable</name>
+ <value>true</value>
+ <tag>OZONE, CONTAINER, RATIS, DATASTREAM</tag>
+ <description>If enable datastream ipc of container.</description>
+ </property>
+ <property>
+ <name>dfs.container.ratis.datastream.port</name>
+ <value>9855</value>
+ <tag>OZONE, CONTAINER, RATIS, DATASTREAM</tag>
+ <description>The datastream port number of container.</description>
+ </property>
+ <property>
+ <name>dfs.container.ratis.datastream.random.port</name>
+ <value>false</value>
+ <tag>OZONE, CONTAINER, RATIS, DATASTREAM</tag>
+ <description>Allocates a random free port for ozone container datastream.
+ This is used only while running unit tests.
+ </description>
+ </property>
+ <property>
<name>dfs.container.ipc.random.port</name>
<value>false</value>
<tag>OZONE, DEBUG, CONTAINER</tag>
diff --git a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
index 8cf584d..3728a0b 100644
--- a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
+++ b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
@@ -46,5 +46,6 @@ public enum ConfigTag {
DELETION,
HA,
BALANCER,
- UPGRADE
+ UPGRADE,
+ DATASTREAM
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 1edd046..9066e6b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -199,7 +199,8 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
boolean isWriteStage =
(cmdType == Type.WriteChunk && dispatcherContext != null
&& dispatcherContext.getStage()
- == DispatcherContext.WriteChunkStage.WRITE_DATA);
+ == DispatcherContext.WriteChunkStage.WRITE_DATA)
+ || (cmdType == Type.StreamInit);
boolean isWriteCommitStage =
(cmdType == Type.WriteChunk && dispatcherContext != null
&& dispatcherContext.getStage()
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index aad9342..577b9bc 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -80,6 +80,7 @@ import io.opentracing.util.GlobalTracer;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.SupportedDataStreamType;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.netty.NettyConfigKeys;
@@ -99,6 +100,7 @@ import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerRpc;
@@ -130,6 +132,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
private int serverPort;
private int adminPort;
private int clientPort;
+ private int dataStreamPort;
private final RaftServer server;
private final List<ThreadPoolExecutor> chunkExecutors;
private final ContainerDispatcher dispatcher;
@@ -149,6 +152,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
// Timeout used while calling submitRequest directly.
private long requestTimeout;
private boolean shouldDeleteRatisLogDirectory;
+ private boolean streamEnable;
private XceiverServerRatis(DatanodeDetails dd,
ContainerDispatcher dispatcher, ContainerController containerController,
@@ -158,6 +162,9 @@ public final class XceiverServerRatis implements XceiverServerSpi {
Objects.requireNonNull(dd, "id == null");
datanodeDetails = dd;
assignPorts();
+ this.streamEnable = conf.getBoolean(
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLE,
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLE_DEFAULT);
RaftProperties serverProperties = newRaftProperties();
this.context = context;
this.dispatcher = dispatcher;
@@ -214,6 +221,34 @@ public final class XceiverServerRatis implements XceiverServerSpi {
chunkExecutors, this, conf);
}
+ private void setUpRatisStream(RaftProperties properties) {
+ // set the datastream config
+ if (conf.getBoolean(
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT,
+ OzoneConfigKeys.
+ DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT_DEFAULT)) {
+ dataStreamPort = 0;
+ } else {
+ dataStreamPort = conf.getInt(
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_PORT,
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_PORT_DEFAULT);
+ }
+ NettyConfigKeys.DataStream.setPort(properties, dataStreamPort);
+ RaftConfigKeys.DataStream.setType(properties,
+ SupportedDataStreamType.NETTY);
+ int dataStreamAsyncRequestThreadPoolSize =
+ conf.getObject(DatanodeRatisServerConfig.class)
+ .getStreamRequestThreads();
+ RaftServerConfigKeys.DataStream.setAsyncRequestThreadPoolSize(properties,
+ dataStreamAsyncRequestThreadPoolSize);
+ int dataStreamWriteRequestThreadPoolSize =
+ conf.getObject(DatanodeRatisServerConfig.class)
+ .getStreamWriteThreads();
+ RaftServerConfigKeys.DataStream.setAsyncWriteThreadPoolSize(properties,
+ dataStreamWriteRequestThreadPoolSize);
+ }
+
+ @SuppressWarnings("checkstyle:methodlength")
private RaftProperties newRaftProperties() {
final RaftProperties properties = new RaftProperties();
@@ -232,6 +267,10 @@ public final class XceiverServerRatis implements XceiverServerSpi {
// set the configs enable and set the stateMachineData sync timeout
RaftServerConfigKeys.Log.StateMachineData.setSync(properties, true);
+ if (streamEnable) {
+ setUpRatisStream(properties);
+ }
+
timeUnit = OzoneConfigKeys.
DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT_DEFAULT.getUnit();
duration = conf.getTimeDuration(
@@ -497,7 +536,12 @@ public final class XceiverServerRatis implements XceiverServerSpi {
Port.Name.RATIS_ADMIN);
serverPort = getRealPort(serverRpc.getInetSocketAddress(),
Port.Name.RATIS_SERVER);
-
+ if (streamEnable) {
+ DataStreamServerRpc dataStreamServerRpc =
+ server.getDataStreamServerRpc();
+ dataStreamPort = getRealPort(dataStreamServerRpc.getInetSocketAddress(),
+ Port.Name.RATIS_DATASTREAM);
+ }
isStarted = true;
}
}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 73df5e4..e447509 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -97,6 +97,7 @@ import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuil
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadChunkResponse;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadContainerResponse;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponse;
+import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponseBuilder;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.putBlockResponseSuccess;
import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest;
@@ -222,6 +223,8 @@ public class KeyValueHandler extends Handler {
return handler.handleDeleteChunk(request, kvContainer);
case WriteChunk:
return handler.handleWriteChunk(request, kvContainer, dispatcherContext);
+ case StreamInit:
+ return handler.handleStreamInit(request, kvContainer, dispatcherContext);
case ListChunk:
return handler.handleUnsupportedOp(request);
case CompactChunk:
@@ -248,6 +251,36 @@ public class KeyValueHandler extends Handler {
return this.blockManager;
}
+ ContainerCommandResponseProto handleStreamInit(
+ ContainerCommandRequestProto request, KeyValueContainer kvContainer,
+ DispatcherContext dispatcherContext) {
+ if (!request.hasWriteChunk()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Malformed Write Chunk request. trace ID: {}",
+ request.getTraceID());
+ }
+ return malformedRequest(request);
+ }
+
+ String path = null;
+ try {
+ checkContainerOpen(kvContainer);
+
+ WriteChunkRequestProto writeChunk = request.getWriteChunk();
+ BlockID blockID = BlockID.getFromProtobuf(writeChunk.getBlockID());
+
+ path = chunkManager
+ .streamInit(kvContainer, blockID);
+
+ } catch (StorageContainerException ex) {
+ return ContainerUtils.logAndReturnError(LOG, ex, request);
+ }
+
+ return getSuccessResponseBuilder(request)
+ .setMessage(path)
+ .build();
+ }
+
/**
* Handles Create Container Request. If successful, adds the container to
* ContainerSet and sends an ICR to the SCM.
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
index 7636473..3e2ab46 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
@@ -73,6 +73,12 @@ public class ChunkManagerDispatcher implements ChunkManager {
.writeChunk(container, blockID, info, data, dispatcherContext);
}
+ public String streamInit(Container container, BlockID blockID)
+ throws StorageContainerException {
+ return selectHandler(container)
+ .streamInit(container, blockID);
+ }
+
@Override
public void finishWriteChunks(KeyValueContainer kvContainer,
BlockData blockData) throws IOException {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
index 18c6b9d..a63a9e0 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
@@ -90,6 +90,14 @@ public class FilePerBlockStrategy implements ChunkManager {
}
@Override
+ public String streamInit(Container container, BlockID blockID)
+ throws StorageContainerException {
+ checkLayoutVersion(container);
+ File chunkFile = getChunkFile(container, blockID, null);
+ return chunkFile.getAbsolutePath();
+ }
+
+ @Override
public void writeChunk(Container container, BlockID blockID, ChunkInfo info,
ChunkBuffer data, DispatcherContext dispatcherContext)
throws StorageContainerException {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
index 15ff9d6..ba06eeb 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
@@ -104,6 +104,11 @@ public interface ChunkManager {
// no-op
}
+ default String streamInit(Container container, BlockID blockID)
+ throws StorageContainerException {
+ return null;
+ }
+
static long getBufferCapacityForChunkRead(ChunkInfo chunkInfo,
long defaultReadBufferCapacity) {
long bufferCapacity = 0;
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
index 1337f28..bb1145b 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
@@ -81,6 +81,8 @@ public class TestDatanodeStateMachine {
TimeUnit.MILLISECONDS);
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, true);
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, true);
+ conf.setBoolean(
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, true);
serverAddresses = new ArrayList<>();
scmServers = new ArrayList<>();
mockServers = new ArrayList<>();
@@ -215,7 +217,6 @@ public class TestDatanodeStateMachine {
OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
datanodeDetails.setPort(port);
ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath);
-
try (DatanodeStateMachine stateMachine =
new DatanodeStateMachine(datanodeDetails, conf, null, null,
null)) {
@@ -424,6 +425,8 @@ public class TestDatanodeStateMachine {
DatanodeDetails.Port.Name.RATIS, 0);
DatanodeDetails.Port restPort = DatanodeDetails.newPort(
DatanodeDetails.Port.Name.REST, 0);
+ DatanodeDetails.Port streamPort = DatanodeDetails.newPort(
+ DatanodeDetails.Port.Name.RATIS_DATASTREAM, 0);
return DatanodeDetails.newBuilder()
.setUuid(UUID.randomUUID())
.setHostName("localhost")
@@ -431,6 +434,7 @@ public class TestDatanodeStateMachine {
.addPort(containerPort)
.addPort(ratisPort)
.addPort(restPort)
+ .addPort(streamPort)
.build();
}
}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
index d23f1c4..ce62640 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
@@ -38,6 +38,7 @@ import org.apache.ratis.client.api.GroupManagementApi;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.retry.RetryPolicy;
import org.junit.Before;
@@ -98,6 +99,8 @@ public class TestCreatePipelineCommandHandler {
.thenReturn(builder);
Mockito.when(builder.setRetryPolicy(Mockito.any(RetryPolicy.class)))
.thenReturn(builder);
+ Mockito.when(builder.setPrimaryDataStreamServer(
+ Mockito.any(RaftPeer.class))).thenReturn(builder);
return builder;
}
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
index 25ed477..205d92e 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java
@@ -23,6 +23,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import java.time.Duration;
import static org.apache.hadoop.hdds.conf.ConfigTag.DATANODE;
+import static org.apache.hadoop.hdds.conf.ConfigTag.DATASTREAM;
import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
import static org.apache.hadoop.hdds.conf.ConfigTag.PERFORMANCE;
import static org.apache.hadoop.hdds.conf.ConfigTag.RATIS;
@@ -123,6 +124,40 @@ public class DatanodeRatisServerConfig {
this.leaderNumPendingRequests = leaderNumPendingRequests;
}
+ @Config(key = "datastream.request.threads",
+ defaultValue = "20",
+ type = ConfigType.INT,
+ tags = {OZONE, DATANODE, RATIS, DATASTREAM},
+ description = "Maximum number of threads in the thread pool for " +
+ "datastream request."
+ )
+ private int streamRequestThreads;
+
+ public int getStreamRequestThreads() {
+ return streamRequestThreads;
+ }
+
+ public void setStreamRequestThreads(int streamRequestThreads) {
+ this.streamRequestThreads = streamRequestThreads;
+ }
+
+ @Config(key = "datastream.write.threads",
+ defaultValue = "20",
+ type = ConfigType.INT,
+ tags = {OZONE, DATANODE, RATIS, DATASTREAM},
+ description = "Maximum number of threads in the thread pool for " +
+ "datastream write."
+ )
+ private int streamWriteThreads;
+
+ public int getStreamWriteThreads() {
+ return streamWriteThreads;
+ }
+
+ public void setStreamWriteThreads(int streamWriteThreads) {
+ this.streamWriteThreads = streamWriteThreads;
+ }
+
@Config(key = "delete.ratis.log.directory",
defaultValue = "true",
type = ConfigType.BOOLEAN,
diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 01c9a1e..b457da7 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -100,6 +100,8 @@ enum Type {
GetSmallFile = 16;
CloseContainer = 17;
GetCommittedBlockLength = 18;
+
+ StreamInit = 19;
}
@@ -392,7 +394,7 @@ enum ChecksumType {
message WriteChunkRequestProto {
required DatanodeBlockID blockID = 1;
- required ChunkInfo chunkData = 2;
+ optional ChunkInfo chunkData = 2;
optional bytes data = 3;
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index c0ff646..a5fff47 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -145,6 +145,8 @@ public class TestEndPoint {
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
serverAddress, 1000)) {
DatanodeDetails datanodeDetails = randomDatanodeDetails();
+ conf.setBoolean(
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, true);
OzoneContainer ozoneContainer = new OzoneContainer(
datanodeDetails, conf, getContext(datanodeDetails), null);
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
@@ -169,6 +171,8 @@ public class TestEndPoint {
true);
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
true);
+ conf.setBoolean(
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, true);
conf.setFromObject(new ReplicationConfig().setPort(0));
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
serverAddress, 1000)) {
diff --git a/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode2.xml b/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode2.xml
index 3d33020..040b515 100644
--- a/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode2.xml
+++ b/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode2.xml
@@ -18,7 +18,7 @@
<configuration default="false" name="Datanode2" type="Application" factoryName="Application">
<option name="MAIN_CLASS_NAME" value="org.apache.hadoop.ozone.HddsDatanodeService" />
<module name="ozone-datanode" />
- <option name="PROGRAM_PARAMETERS" value="-conf=hadoop-ozone/dev-support/intellij/ozone-site.xml --set ozone.metadata.dirs=/tmp/datanode2 --set hdds.datanode.dir=/tmp/datanode2/storage --set hdds.datanode.http-address=127.0.0.1:10021 --set dfs.container.ratis.ipc=10022 --set dfs.container.ipc=10023 --set dfs.container.ratis.server.port=10024 --set dfs.container.ratis.admin.port=10025 --set hdds.datanode.replication.port=10026" />
+ <option name="PROGRAM_PARAMETERS" value="-conf=hadoop-ozone/dev-support/intellij/ozone-site.xml --set ozone.metadata.dirs=/tmp/datanode2 --set hdds.datanode.dir=/tmp/datanode2/storage --set hdds.datanode.http-address=127.0.0.1:10021 --set dfs.container.ratis.ipc=10022 --set dfs.container.ipc=10023 --set dfs.container.ratis.server.port=10024 --set dfs.container.ratis.admin.port=10025 --set hdds.datanode.replication.port=10026 --set dfs.container.ratis.datastream.port=10027" />
<option name="VM_PARAMETERS" value="-Dlog4j.configuration=file:hadoop-ozone/dev-support/intellij/log4j.properties" />
<extension name="coverage">
<pattern>
diff --git a/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode3.xml b/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode3.xml
index 10b6682..6a3116e 100644
--- a/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode3.xml
+++ b/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode3.xml
@@ -18,7 +18,7 @@
<configuration default="false" name="Datanode3" type="Application" factoryName="Application">
<option name="MAIN_CLASS_NAME" value="org.apache.hadoop.ozone.HddsDatanodeService" />
<module name="ozone-datanode" />
- <option name="PROGRAM_PARAMETERS" value="-conf=hadoop-ozone/dev-support/intellij/ozone-site.xml --set ozone.metadata.dirs=/tmp/datanode3 --set hdds.datanode.dir=/tmp/datanode3/storage --set hdds.datanode.http-address=127.0.0.1:10031 --set dfs.container.ratis.ipc=10032 --set dfs.container.ipc=10033 --set dfs.container.ratis.server.port=10034 --set dfs.container.ratis.admin.port=10035 --set hdds.datanode.replication.port=10036" />
+ <option name="PROGRAM_PARAMETERS" value="-conf=hadoop-ozone/dev-support/intellij/ozone-site.xml --set ozone.metadata.dirs=/tmp/datanode3 --set hdds.datanode.dir=/tmp/datanode3/storage --set hdds.datanode.http-address=127.0.0.1:10031 --set dfs.container.ratis.ipc=10032 --set dfs.container.ipc=10033 --set dfs.container.ratis.server.port=10034 --set dfs.container.ratis.admin.port=10035 --set hdds.datanode.replication.port=10036 --set dfs.container.ratis.datastream.port=10037" />
<option name="VM_PARAMETERS" value="-Dlog4j.configuration=file:hadoop-ozone/dev-support/intellij/log4j.properties" />
<extension name="coverage">
<pattern>
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 4b9bd5f..923e0d0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -318,6 +318,7 @@ public interface MiniOzoneCluster {
protected Optional<String> omId = Optional.empty();
protected Boolean randomContainerPort = true;
+ protected Boolean randomContainerStreamPort = true;
protected Optional<String> datanodeReservedSpace = Optional.empty();
protected Optional<Integer> chunkSize = Optional.empty();
protected OptionalInt streamBufferSize = OptionalInt.empty();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 6f1b0ce..7959e0e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -86,6 +86,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_INIT_D
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_PORT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ADMIN_PORT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_SERVER_PORT;
@@ -911,6 +912,8 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
randomContainerPort);
conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
randomContainerPort);
+ conf.setBoolean(DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT,
+ randomContainerStreamPort);
conf.setFromObject(new ReplicationConfig().setPort(0));
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index b52d4df..cb07cfa 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -218,6 +218,8 @@ public class TestMiniOzoneCluster {
ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, true);
ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT,
true);
+ ozoneConf.setBoolean(
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, true);
List<DatanodeStateMachine> stateMachines = new ArrayList<>();
try {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
index cd7c995..cb2db30 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
@@ -216,6 +216,8 @@ public class TestSecureContainerServer {
DatanodeDetails dn, OzoneConfiguration conf) throws IOException {
conf.setInt(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT,
dn.getPort(DatanodeDetails.Port.Name.RATIS).getValue());
+ conf.setBoolean(
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, true);
final String dir = TEST_DIR + dn.getUuid();
conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
final ContainerDispatcher dispatcher = createDispatcher(dn,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org