You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by lj...@apache.org on 2019/03/08 03:33:36 UTC
[hadoop] branch ozone-0.4 updated: HDDS-1208. ContainerStateMachine
should set chunk data as state machine data for ratis. Contributed by
Lokesh Jain.
This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch ozone-0.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/ozone-0.4 by this push:
new c334bc8 HDDS-1208. ContainerStateMachine should set chunk data as state machine data for ratis. Contributed by Lokesh Jain.
c334bc8 is described below
commit c334bc8baf982e179edbd2302fb41231b20fc590
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Wed Mar 6 17:00:37 2019 +0530
HDDS-1208. ContainerStateMachine should set chunk data as state machine data for ratis. Contributed by Lokesh Jain.
(cherry picked from commit 129fd5dd18dce0fba48561326a48082888bd6f83)
---
.../server/ratis/ContainerStateMachine.java | 73 +++++++---------------
1 file changed, 23 insertions(+), 50 deletions(-)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index ed7e099..0fc2d0d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -78,7 +78,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@@ -277,7 +276,7 @@ public class ContainerStateMachine extends BaseStateMachine {
public TransactionContext startTransaction(RaftClientRequest request)
throws IOException {
final ContainerCommandRequestProto proto =
- getRequestProto(request.getMessage().getContent());
+ getContainerCommandRequestProto(request.getMessage().getContent());
Preconditions.checkArgument(request.getRaftGroupId().equals(gid));
try (Scope scope = TracingUtil
.importAndCreateScope(proto.getCmdType().name(), proto.getTraceID())) {
@@ -294,17 +293,6 @@ public class ContainerStateMachine extends BaseStateMachine {
}
if (proto.getCmdType() == Type.WriteChunk) {
final WriteChunkRequestProto write = proto.getWriteChunk();
- // create the state machine data proto
- final WriteChunkRequestProto dataWriteChunkProto =
- WriteChunkRequestProto
- .newBuilder(write)
- .build();
- ContainerCommandRequestProto dataContainerCommandProto =
- ContainerCommandRequestProto
- .newBuilder(proto)
- .setWriteChunk(dataWriteChunkProto)
- .build();
-
// create the log entry proto
final WriteChunkRequestProto commitWriteChunkProto =
WriteChunkRequestProto.newBuilder()
@@ -323,7 +311,7 @@ public class ContainerStateMachine extends BaseStateMachine {
.setClientRequest(request)
.setStateMachine(this)
.setServerRole(RaftPeerRole.LEADER)
- .setStateMachineData(dataContainerCommandProto.toByteString())
+ .setStateMachineData(write.getData())
.setLogData(commitContainerCommandProto.toByteString())
.build();
} else {
@@ -341,8 +329,8 @@ public class ContainerStateMachine extends BaseStateMachine {
return entryProto.getStateMachineEntry().getStateMachineData();
}
- private ContainerCommandRequestProto getRequestProto(ByteString request)
- throws InvalidProtocolBufferException {
+ private ContainerCommandRequestProto getContainerCommandRequestProto(
+ ByteString request) throws InvalidProtocolBufferException {
// TODO: We can avoid creating new builder and set pipeline Id if
// the client is already sending the pipeline id, then we just have to
// validate the pipeline Id.
@@ -353,7 +341,9 @@ public class ContainerStateMachine extends BaseStateMachine {
private ContainerCommandResponseProto dispatchCommand(
ContainerCommandRequestProto requestProto, DispatcherContext context) {
- LOG.trace("dispatch {}", requestProto);
+ LOG.trace("dispatch {} containerID={} pipelineID={} traceID={}",
+ requestProto.getCmdType(), requestProto.getContainerID(),
+ requestProto.getPipelineID(), requestProto.getTraceID());
if (isBlockTokenEnabled) {
try {
// ServerInterceptors intercepts incoming request and creates ugi.
@@ -432,8 +422,15 @@ public class ContainerStateMachine extends BaseStateMachine {
public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
try {
metrics.incNumWriteStateMachineOps();
- final ContainerCommandRequestProto requestProto =
- getRequestProto(getStateMachineData(entry.getStateMachineLogEntry()));
+ ContainerCommandRequestProto requestProto =
+ getContainerCommandRequestProto(
+ entry.getStateMachineLogEntry().getLogData());
+ WriteChunkRequestProto writeChunk =
+ WriteChunkRequestProto.newBuilder(requestProto.getWriteChunk())
+ .setData(getStateMachineData(entry.getStateMachineLogEntry()))
+ .build();
+ requestProto = ContainerCommandRequestProto.newBuilder(requestProto)
+ .setWriteChunk(writeChunk).build();
Type cmdType = requestProto.getCmdType();
// For only writeChunk, there will be writeStateMachineData call.
@@ -457,7 +454,7 @@ public class ContainerStateMachine extends BaseStateMachine {
try {
metrics.incNumReadStateMachineOps();
final ContainerCommandRequestProto requestProto =
- getRequestProto(request.getContent());
+ getContainerCommandRequestProto(request.getContent());
return CompletableFuture.completedFuture(runCommand(requestProto, null));
} catch (IOException e) {
metrics.incNumReadStateMachineFails();
@@ -507,34 +504,8 @@ public class ContainerStateMachine extends BaseStateMachine {
*/
private ByteString getCachedStateMachineData(Long logIndex, long term,
ContainerCommandRequestProto requestProto) throws ExecutionException {
- try {
- return reconstructWriteChunkRequest(
- stateMachineDataCache.get(logIndex, new Callable<ByteString>() {
- @Override
- public ByteString call() throws Exception {
- return readStateMachineData(requestProto, term, logIndex);
- }
- }), requestProto);
- } catch (ExecutionException e) {
- throw e;
- }
- }
-
- private ByteString reconstructWriteChunkRequest(ByteString data,
- ContainerCommandRequestProto requestProto) {
- WriteChunkRequestProto writeChunkRequestProto =
- requestProto.getWriteChunk();
- // reconstruct the write chunk request
- final WriteChunkRequestProto.Builder dataWriteChunkProto =
- WriteChunkRequestProto.newBuilder(writeChunkRequestProto)
- // adding the state machine data
- .setData(data);
-
- ContainerCommandRequestProto.Builder newStateMachineProto =
- ContainerCommandRequestProto.newBuilder(requestProto)
- .setWriteChunk(dataWriteChunkProto);
-
- return newStateMachineProto.build().toByteString();
+ return stateMachineDataCache.get(logIndex,
+ () -> readStateMachineData(requestProto, term, logIndex));
}
/**
@@ -568,7 +539,8 @@ public class ContainerStateMachine extends BaseStateMachine {
}
try {
final ContainerCommandRequestProto requestProto =
- getRequestProto(entry.getStateMachineLogEntry().getLogData());
+ getContainerCommandRequestProto(
+ entry.getStateMachineLogEntry().getLogData());
// readStateMachineData should only be called for "write" to Ratis.
Preconditions.checkArgument(!HddsUtils.isReadOnly(requestProto));
if (requestProto.getCmdType() == Type.WriteChunk) {
@@ -632,7 +604,8 @@ public class ContainerStateMachine extends BaseStateMachine {
try {
metrics.incNumApplyTransactionsOps();
ContainerCommandRequestProto requestProto =
- getRequestProto(trx.getStateMachineLogEntry().getLogData());
+ getContainerCommandRequestProto(
+ trx.getStateMachineLogEntry().getLogData());
Type cmdType = requestProto.getCmdType();
// Make sure that in write chunk, the user data is not set
if (cmdType == Type.WriteChunk) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org