You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ms...@apache.org on 2018/01/23 06:03:38 UTC
hadoop git commit: HDFS-13024. Ozone: ContainerStateMachine should
synchronize operations between createContainer and writeChunk. Contributed by
Mukul Kumar Singh.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7240 b1c8c1de7 -> a6c2b6694
HDFS-13024. Ozone: ContainerStateMachine should synchronize operations between createContainer and writeChunk. Contributed by Mukul Kumar Singh.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a6c2b669
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a6c2b669
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a6c2b669
Branch: refs/heads/HDFS-7240
Commit: a6c2b6694b6607bdff8465bf6641716711e166c6
Parents: b1c8c1d
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Tue Jan 23 11:19:46 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Tue Jan 23 11:19:46 2018 +0530
----------------------------------------------------------------------
.../apache/hadoop/ozone/OzoneConfigKeys.java | 4 +
.../org/apache/hadoop/scm/ScmConfigKeys.java | 6 +-
.../server/ratis/ContainerStateMachine.java | 115 +++++++++++++++----
.../server/ratis/XceiverServerRatis.java | 36 ++++--
.../src/main/resources/ozone-default.xml | 12 +-
5 files changed, 135 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6c2b669/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 8059b5e..22a4787 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -229,6 +229,10 @@ public final class OzoneConfigKeys {
= ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY;
public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT
= ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT;
+ public static final String DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY
+ = ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY;
+ public static final int DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT
+ = ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT;
public static final int DFS_CONTAINER_CHUNK_MAX_SIZE
= ScmConfigKeys.OZONE_SCM_CHUNK_MAX_SIZE;
public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6c2b669/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
index b41db77..6f5c873 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java
@@ -57,7 +57,11 @@ public final class ScmConfigKeys {
public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY =
"dfs.container.ratis.segment.size";
public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT =
- 128 * 1024 * 1024;
+ 1 * 1024 * 1024 * 1024;
+ public static final String DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY =
+ "dfs.container.ratis.segment.preallocated.size";
+ public static final int
+ DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT = 128 * 1024 * 1024;
// TODO : this is copied from OzoneConsts, may need to move to a better place
public static final String OZONE_SCM_CHUNK_SIZE_KEY = "ozone.scm.chunk.size";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6c2b669/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index a4517b3..c96cc5d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
@@ -45,29 +46,61 @@ import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.ArrayBlockingQueue;
-/** A {@link org.apache.ratis.statemachine.StateMachine} for containers. */
+/** A {@link org.apache.ratis.statemachine.StateMachine} for containers.
+ *
+ * The stateMachine is responsible for handling different types of container
+ * requests. The container requests can be divided into readonly and write
+ * requests.
+ *
+ * Read only requests are classified in
+ * {@link org.apache.hadoop.scm.XceiverClientRatis#isReadOnly}
+ * and these readonly requests are replied from the
+ * {@link #query(RaftClientRequest)}
+ *
+ * The write requests can be divided into requests with user data
+ * (WriteChunkRequest) and other request without user data.
+ *
+ * Inorder to optimize the write throughput, the writeChunk request is
+ * processed in 2 phases. The 2 phases are divided in
+ * {@link #startTransaction(RaftClientRequest)}, in the first phase the user
+ * data is written directly into the state machine via
+ * {@link #writeStateMachineData} and in the second phase the
+ * transaction is committed via {@link #applyTransaction(TransactionContext)}
+ *
+ * For the requests with no stateMachine data, the transaction is directly
+ * committed through
+ * {@link #applyTransaction(TransactionContext)}
+ *
+ * There are 2 ordering operation which are enforced right now in the code,
+ * 1) Write chunk operation are executed after the create container operation,
+ * the write chunk operation will fail otherwise as the container still hasn't
+ * been created. Hence the create container operation has been split in the
+ * {@link #startTransaction(RaftClientRequest)}, this will help in synchronizing
+ * the calls in {@link #writeStateMachineData}
+ *
+ * 2) Write chunk commit operation is executed after write chunk state machine
+ * operation. This will ensure that commit operation is sync'd with the state
+ * machine operation.
+ * */
public class ContainerStateMachine extends BaseStateMachine {
static final Logger LOG = LoggerFactory.getLogger(
ContainerStateMachine.class);
private final SimpleStateMachineStorage storage
= new SimpleStateMachineStorage();
private final ContainerDispatcher dispatcher;
- private final ThreadPoolExecutor writeChunkExecutor;
+ private ThreadPoolExecutor writeChunkExecutor;
private final ConcurrentHashMap<String, CompletableFuture<Message>>
- writeChunkMap;
+ writeChunkFutureMap;
+ private final ConcurrentHashMap<String, CompletableFuture<Message>>
+ createContainerFutureMap;
ContainerStateMachine(ContainerDispatcher dispatcher,
- int numWriteChunkThreads) {
+ ThreadPoolExecutor writeChunkExecutor) {
this.dispatcher = dispatcher;
- writeChunkMap = new ConcurrentHashMap<>();
- writeChunkExecutor =
- new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads,
- 60, TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(1024),
- new ThreadPoolExecutor.CallerRunsPolicy());
+ this.writeChunkExecutor = writeChunkExecutor;
+ this.writeChunkFutureMap = new ConcurrentHashMap<>();
+ this.createContainerFutureMap = new ConcurrentHashMap<>();
}
@Override
@@ -81,13 +114,13 @@ public class ContainerStateMachine extends BaseStateMachine {
throws IOException {
super.initialize(id, properties, raftStorage);
storage.init(raftStorage);
- writeChunkExecutor.prestartAllCoreThreads();
// TODO handle snapshots
// TODO: Add a flag that tells you that initialize has been called.
// Check with Ratis if this feature is done in Ratis.
}
+ @Override
public TransactionContext startTransaction(RaftClientRequest request)
throws IOException {
final ContainerCommandRequestProto proto =
@@ -110,8 +143,12 @@ public class ContainerStateMachine extends BaseStateMachine {
// create the log entry proto
final WriteChunkRequestProto commitWriteChunkProto =
- WriteChunkRequestProto
- .newBuilder(write)
+ WriteChunkRequestProto.newBuilder()
+ .setPipeline(write.getPipeline())
+ .setKeyName(write.getKeyName())
+ .setChunkData(write.getChunkData())
+ // skipping the data field as it is
+ // already set in statemachine data proto
.setStage(ContainerProtos.Stage.COMMIT_DATA)
.build();
ContainerCommandRequestProto commitContainerCommandProto =
@@ -124,6 +161,11 @@ public class ContainerStateMachine extends BaseStateMachine {
.setData(getShadedByteString(commitContainerCommandProto))
.setStateMachineData(getShadedByteString(dataContainerCommandProto))
.build();
+ } else if (proto.getCmdType() == ContainerProtos.Type.CreateContainer) {
+ log = SMLogEntryProto.newBuilder()
+ .setData(request.getMessage().getContent())
+ .setStateMachineData(request.getMessage().getContent())
+ .build();
} else {
log = SMLogEntryProto.newBuilder()
.setData(request.getMessage().getContent())
@@ -154,12 +196,30 @@ public class ContainerStateMachine extends BaseStateMachine {
try {
final ContainerCommandRequestProto requestProto =
getRequestProto(entry.getSmLogEntry().getStateMachineData());
- final WriteChunkRequestProto write = requestProto.getWriteChunk();
- Message raftClientReply = runCommand(requestProto);
- CompletableFuture<Message> future =
- CompletableFuture.completedFuture(raftClientReply);
- writeChunkMap.put(write.getChunkData().getChunkName(),future);
- return future;
+ if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) {
+ String containerName =
+ requestProto.getCreateContainer().getContainerData().getName();
+ createContainerFutureMap.
+ computeIfAbsent(containerName, k -> new CompletableFuture<>());
+ return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
+ } else {
+ final WriteChunkRequestProto write = requestProto.getWriteChunk();
+ String containerName = write.getPipeline().getContainerName();
+ CompletableFuture<Message> future =
+ createContainerFutureMap.get(containerName);
+
+ CompletableFuture<Message> writeChunkFuture;
+ if (future != null) {
+ writeChunkFuture = future.thenApplyAsync(
+ v -> runCommand(requestProto), writeChunkExecutor);
+ } else {
+ writeChunkFuture = CompletableFuture.supplyAsync(
+ () -> runCommand(requestProto), writeChunkExecutor);
+ }
+ writeChunkFutureMap
+ .put(write.getChunkData().getChunkName(), writeChunkFuture);
+ return writeChunkFuture;
+ }
} catch (IOException e) {
return completeExceptionally(e);
}
@@ -186,13 +246,21 @@ public class ContainerStateMachine extends BaseStateMachine {
if (requestProto.getCmdType() == ContainerProtos.Type.WriteChunk) {
WriteChunkRequestProto write = requestProto.getWriteChunk();
+ // the data field has already been removed in start Transaction
+ Preconditions.checkArgument(!write.hasData());
CompletableFuture<Message> stateMachineFuture =
- writeChunkMap.remove(write.getChunkData().getChunkName());
+ writeChunkFutureMap.remove(write.getChunkData().getChunkName());
return stateMachineFuture
.thenComposeAsync(v ->
CompletableFuture.completedFuture(runCommand(requestProto)));
} else {
- return CompletableFuture.completedFuture(runCommand(requestProto));
+ Message message = runCommand(requestProto);
+ if (requestProto.getCmdType() == ContainerProtos.Type.CreateContainer) {
+ String containerName =
+ requestProto.getCreateContainer().getContainerData().getName();
+ createContainerFutureMap.remove(containerName).complete(message);
+ }
+ return CompletableFuture.completedFuture(message);
}
} catch (IOException e) {
return completeExceptionally(e);
@@ -207,6 +275,5 @@ public class ContainerStateMachine extends BaseStateMachine {
@Override
public void close() throws IOException {
- writeChunkExecutor.shutdown();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6c2b669/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 7baca25..ff52341 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.ozone.container.common.transport.server
.XceiverServerSpi;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
-import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
@@ -48,6 +47,9 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.util.Objects;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
/**
* Creates a ratis server endpoint that acts as the communication layer for
@@ -57,6 +59,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
static final Logger LOG = LoggerFactory.getLogger(XceiverServerRatis.class);
private final int port;
private final RaftServer server;
+ private ThreadPoolExecutor writeChunkExecutor;
private XceiverServerRatis(DatanodeID id, int port, String storageDir,
ContainerDispatcher dispatcher, Configuration conf) throws IOException {
@@ -68,6 +71,9 @@ public final class XceiverServerRatis implements XceiverServerSpi {
final int raftSegmentSize = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT);
+ final int raftSegmentPreallocatedSize = conf.getInt(
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
+ OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT);
final int maxChunkSize = OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE;
final int numWriteChunkThreads = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY,
@@ -76,28 +82,34 @@ public final class XceiverServerRatis implements XceiverServerSpi {
Objects.requireNonNull(id, "id == null");
this.port = port;
RaftProperties serverProperties = newRaftProperties(rpc, port,
- storageDir, maxChunkSize, raftSegmentSize);
-
+ storageDir, maxChunkSize, raftSegmentSize, raftSegmentPreallocatedSize);
+
+ writeChunkExecutor =
+ new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads,
+ 100, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(1024),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ ContainerStateMachine stateMachine =
+ new ContainerStateMachine(dispatcher, writeChunkExecutor);
this.server = RaftServer.newBuilder()
.setServerId(RatisHelper.toRaftPeerId(id))
.setGroup(RatisHelper.emptyRaftGroup())
.setProperties(serverProperties)
- .setStateMachine(new ContainerStateMachine(dispatcher,
- numWriteChunkThreads))
+ .setStateMachine(stateMachine)
.build();
}
private static RaftProperties newRaftProperties(
RpcType rpc, int port, String storageDir, int scmChunkSize,
- int raftSegmentSize) {
+ int raftSegmentSize, int raftSegmentPreallocatedSize) {
final RaftProperties properties = new RaftProperties();
RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true);
RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties,
- SizeInBytes.valueOf(raftSegmentSize));
+ SizeInBytes.valueOf(raftSegmentPreallocatedSize));
RaftServerConfigKeys.Log.setWriteBufferSize(properties,
SizeInBytes.valueOf(scmChunkSize));
RaftServerConfigKeys.Log.setPreallocatedSize(properties,
- SizeInBytes.valueOf(raftSegmentSize));
+ SizeInBytes.valueOf(raftSegmentPreallocatedSize));
RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
SizeInBytes.valueOf(raftSegmentSize));
RaftServerConfigKeys.setStorageDir(properties, new File(storageDir));
@@ -106,9 +118,9 @@ public final class XceiverServerRatis implements XceiverServerSpi {
//TODO: change these configs to setter after RATIS-154
properties.setInt("raft.server.log.segment.cache.num.max", 2);
properties.setInt("raft.grpc.message.size.max",
- scmChunkSize + raftSegmentSize);
- properties.setInt("raft.server.rpc.timeout.min", 500);
- properties.setInt("raft.server.rpc.timeout.max", 600);
+ scmChunkSize + raftSegmentPreallocatedSize);
+ properties.setInt("raft.server.rpc.timeout.min", 800);
+ properties.setInt("raft.server.rpc.timeout.max", 1000);
if (rpc == SupportedRpcType.GRPC) {
GrpcConfigKeys.Server.setPort(properties, port);
} else {
@@ -171,12 +183,14 @@ public final class XceiverServerRatis implements XceiverServerSpi {
public void start() throws IOException {
LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
server.getId(), getIPCPort());
+ writeChunkExecutor.prestartAllCoreThreads();
server.start();
}
@Override
public void stop() {
try {
+ writeChunkExecutor.shutdown();
server.close();
} catch (IOException e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6c2b669/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
index e1da595..434f5c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
@@ -356,10 +356,18 @@
</property>
<property>
<name>dfs.container.ratis.segment.size</name>
- <value>134217728</value>
+ <value>1073741824</value>
<tag>OZONE, RATIS, PERFORMANCE</tag>
<description>The size of the raft segment used by Apache Ratis on datanodes.
- (128 MB by default)
+ (1 GB by default)
+ </description>
+ </property>
+ <property>
+ <name>dfs.container.ratis.segment.preallocated.size</name>
+ <value>134217728</value>
+ <tag>OZONE, RATIS, PERFORMANCE</tag>
+ <description>The size of the buffer which is preallocated for raft segment
+ used by Apache Ratis on datanodes.(128 MB by default)
</description>
</property>
<property>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org