You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/08/08 20:11:59 UTC
[18/50] [abbrv] hadoop git commit: HDDS-230. ContainerStateMachine
should implement readStateMachineData api to read data from Containers if
required during replication. Contributed by Mukul Kumar Singh.
HDDS-230. ContainerStateMachine should implement readStateMachineData api to read data from Containers if required during replication. Contributed by Mukul Kumar Singh.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/900c0e11
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/900c0e11
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/900c0e11
Branch: refs/heads/HDFS-12943
Commit: 900c0e114f391f4dbf21a0e08a63c2cf22659eb7
Parents: 2e4e02b
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Tue Aug 7 15:03:14 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Tue Aug 7 15:03:14 2018 +0530
----------------------------------------------------------------------
.../server/ratis/ContainerStateMachine.java | 142 ++++++++++++++++---
.../server/ratis/XceiverServerRatis.java | 10 +-
.../org/apache/hadoop/ozone/om/OMMetrics.java | 2 +-
hadoop-project/pom.xml | 2 +-
4 files changed, 129 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/900c0e11/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index c0dd0ba..15e991a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -19,20 +19,26 @@
package org.apache.hadoop.ozone.container.common.transport.server.ratis;
import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.shaded.com.google.protobuf
.InvalidProtocolBufferException;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Stage;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.WriteChunkRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ReadChunkRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ReadChunkResponseProto;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
-import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
@@ -96,16 +102,16 @@ public class ContainerStateMachine extends BaseStateMachine {
private final SimpleStateMachineStorage storage
= new SimpleStateMachineStorage();
private final ContainerDispatcher dispatcher;
- private ThreadPoolExecutor writeChunkExecutor;
+ private ThreadPoolExecutor chunkExecutor;
private final ConcurrentHashMap<Long, CompletableFuture<Message>>
writeChunkFutureMap;
private final ConcurrentHashMap<Long, CompletableFuture<Message>>
createContainerFutureMap;
ContainerStateMachine(ContainerDispatcher dispatcher,
- ThreadPoolExecutor writeChunkExecutor) {
+ ThreadPoolExecutor chunkExecutor) {
this.dispatcher = dispatcher;
- this.writeChunkExecutor = writeChunkExecutor;
+ this.chunkExecutor = chunkExecutor;
this.writeChunkFutureMap = new ConcurrentHashMap<>();
this.createContainerFutureMap = new ConcurrentHashMap<>();
}
@@ -117,9 +123,9 @@ public class ContainerStateMachine extends BaseStateMachine {
@Override
public void initialize(
- RaftPeerId id, RaftProperties properties, RaftStorage raftStorage)
+ RaftServer server, RaftGroupId id, RaftStorage raftStorage)
throws IOException {
- super.initialize(id, properties, raftStorage);
+ super.initialize(server, id, raftStorage);
storage.init(raftStorage);
// TODO handle snapshots
@@ -134,13 +140,13 @@ public class ContainerStateMachine extends BaseStateMachine {
getRequestProto(request.getMessage().getContent());
final SMLogEntryProto log;
- if (proto.getCmdType() == ContainerProtos.Type.WriteChunk) {
+ if (proto.getCmdType() == Type.WriteChunk) {
final WriteChunkRequestProto write = proto.getWriteChunk();
// create the state machine data proto
final WriteChunkRequestProto dataWriteChunkProto =
WriteChunkRequestProto
.newBuilder(write)
- .setStage(ContainerProtos.Stage.WRITE_DATA)
+ .setStage(Stage.WRITE_DATA)
.build();
ContainerCommandRequestProto dataContainerCommandProto =
ContainerCommandRequestProto
@@ -155,7 +161,7 @@ public class ContainerStateMachine extends BaseStateMachine {
.setChunkData(write.getChunkData())
// skipping the data field as it is
// already set in statemachine data proto
- .setStage(ContainerProtos.Stage.COMMIT_DATA)
+ .setStage(Stage.COMMIT_DATA)
.build();
ContainerCommandRequestProto commitContainerCommandProto =
ContainerCommandRequestProto
@@ -167,7 +173,7 @@ public class ContainerStateMachine extends BaseStateMachine {
.setData(commitContainerCommandProto.toByteString())
.setStateMachineData(dataContainerCommandProto.toByteString())
.build();
- } else if (proto.getCmdType() == ContainerProtos.Type.CreateContainer) {
+ } else if (proto.getCmdType() == Type.CreateContainer) {
log = SMLogEntryProto.newBuilder()
.setData(request.getMessage().getContent())
.setStateMachineData(request.getMessage().getContent())
@@ -185,11 +191,16 @@ public class ContainerStateMachine extends BaseStateMachine {
return ContainerCommandRequestProto.parseFrom(request);
}
- private Message runCommand(ContainerCommandRequestProto requestProto) {
+ private ContainerCommandResponseProto dispatchCommand(
+ ContainerCommandRequestProto requestProto) {
LOG.trace("dispatch {}", requestProto);
ContainerCommandResponseProto response = dispatcher.dispatch(requestProto);
LOG.trace("response {}", response);
- return () -> response.toByteString();
+ return response;
+ }
+
+ private Message runCommand(ContainerCommandRequestProto requestProto) {
+ return dispatchCommand(requestProto)::toByteString;
}
private CompletableFuture<Message> handleWriteChunk(
@@ -201,10 +212,10 @@ public class ContainerStateMachine extends BaseStateMachine {
CompletableFuture<Message> writeChunkFuture;
if (future != null) {
writeChunkFuture = future.thenApplyAsync(
- v -> runCommand(requestProto), writeChunkExecutor);
+ v -> runCommand(requestProto), chunkExecutor);
} else {
writeChunkFuture = CompletableFuture.supplyAsync(
- () -> runCommand(requestProto), writeChunkExecutor);
+ () -> runCommand(requestProto), chunkExecutor);
}
writeChunkFutureMap.put(entryIndex, writeChunkFuture);
return writeChunkFuture;
@@ -227,7 +238,7 @@ public class ContainerStateMachine extends BaseStateMachine {
try {
final ContainerCommandRequestProto requestProto =
getRequestProto(entry.getSmLogEntry().getStateMachineData());
- ContainerProtos.Type cmdType = requestProto.getCmdType();
+ Type cmdType = requestProto.getCmdType();
switch (cmdType) {
case CreateContainer:
return handleCreateContainer(requestProto);
@@ -253,6 +264,97 @@ public class ContainerStateMachine extends BaseStateMachine {
}
}
+ private LogEntryProto readStateMachineData(SMLogEntryProto smLogEntryProto,
+ ContainerCommandRequestProto requestProto) {
+ WriteChunkRequestProto writeChunkRequestProto =
+ requestProto.getWriteChunk();
+ // Assert that store log entry is for COMMIT_DATA, the WRITE_DATA is
+ // written through writeStateMachineData.
+ Preconditions.checkArgument(writeChunkRequestProto.getStage()
+ == Stage.COMMIT_DATA);
+
+ // prepare the chunk to be read
+ ReadChunkRequestProto.Builder readChunkRequestProto =
+ ReadChunkRequestProto.newBuilder()
+ .setBlockID(writeChunkRequestProto.getBlockID())
+ .setChunkData(writeChunkRequestProto.getChunkData());
+ ContainerCommandRequestProto dataContainerCommandProto =
+ ContainerCommandRequestProto.newBuilder(requestProto)
+ .setCmdType(Type.ReadChunk)
+ .setReadChunk(readChunkRequestProto)
+ .build();
+
+ // read the chunk
+ ContainerCommandResponseProto response =
+ dispatchCommand(dataContainerCommandProto);
+ ReadChunkResponseProto responseProto = response.getReadChunk();
+
+ // assert that the response has data in it.
+ Preconditions.checkNotNull(responseProto.getData());
+
+ // reconstruct the write chunk request
+ final WriteChunkRequestProto.Builder dataWriteChunkProto =
+ WriteChunkRequestProto.newBuilder(writeChunkRequestProto)
+ // adding the state machine data
+ .setData(responseProto.getData())
+ .setStage(Stage.WRITE_DATA);
+
+ ContainerCommandRequestProto.Builder newStateMachineProto =
+ ContainerCommandRequestProto.newBuilder(requestProto)
+ .setWriteChunk(dataWriteChunkProto);
+
+ return recreateLogEntryProto(smLogEntryProto,
+ newStateMachineProto.build().toByteString());
+ }
+
+ private LogEntryProto recreateLogEntryProto(SMLogEntryProto smLogEntryProto,
+ ByteString stateMachineData) {
+ // recreate the log entry
+ final SMLogEntryProto log =
+ SMLogEntryProto.newBuilder(smLogEntryProto)
+ .setStateMachineData(stateMachineData)
+ .build();
+ return LogEntryProto.newBuilder().setSmLogEntry(log).build();
+ }
+
+ /*
+ * This api is used by the leader while appending logs to the follower
+ * This allows the leader to read the state machine data from the
+ * state machine implementation in case cached state machine data has been
+ * evicted.
+ */
+ @Override
+ public CompletableFuture<LogEntryProto> readStateMachineData(
+ LogEntryProto entry) {
+ SMLogEntryProto smLogEntryProto = entry.getSmLogEntry();
+ if (!smLogEntryProto.getStateMachineData().isEmpty()) {
+ return CompletableFuture.completedFuture(entry);
+ }
+
+ try {
+ final ContainerCommandRequestProto requestProto =
+ getRequestProto(entry.getSmLogEntry().getData());
+ // readStateMachineData should only be called for "write" to Ratis.
+ Preconditions.checkArgument(!HddsUtils.isReadOnly(requestProto));
+
+ if (requestProto.getCmdType() == Type.WriteChunk) {
+ return CompletableFuture.supplyAsync(() ->
+ readStateMachineData(smLogEntryProto, requestProto),
+ chunkExecutor);
+ } else if (requestProto.getCmdType() == Type.CreateContainer) {
+ LogEntryProto log =
+ recreateLogEntryProto(smLogEntryProto, requestProto.toByteString());
+ return CompletableFuture.completedFuture(log);
+ } else {
+ throw new IllegalStateException("Cmd type:" + requestProto.getCmdType()
+ + " cannot have state machine data");
+ }
+ } catch (Exception e) {
+ LOG.error("unable to read stateMachineData:" + e);
+ return completeExceptionally(e);
+ }
+ }
+
/*
* ApplyTransaction calls in Ratis are sequential.
*/
@@ -261,9 +363,9 @@ public class ContainerStateMachine extends BaseStateMachine {
try {
ContainerCommandRequestProto requestProto =
getRequestProto(trx.getSMLogEntry().getData());
- ContainerProtos.Type cmdType = requestProto.getCmdType();
+ Type cmdType = requestProto.getCmdType();
- if (cmdType == ContainerProtos.Type.WriteChunk) {
+ if (cmdType == Type.WriteChunk) {
WriteChunkRequestProto write = requestProto.getWriteChunk();
// the data field has already been removed in start Transaction
Preconditions.checkArgument(!write.hasData());
@@ -274,7 +376,7 @@ public class ContainerStateMachine extends BaseStateMachine {
CompletableFuture.completedFuture(runCommand(requestProto)));
} else {
Message message = runCommand(requestProto);
- if (cmdType == ContainerProtos.Type.CreateContainer) {
+ if (cmdType == Type.CreateContainer) {
long containerID = requestProto.getContainerID();
createContainerFutureMap.remove(containerID).complete(message);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/900c0e11/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index b9c7cae..723b94ae 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -72,7 +72,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
private final int port;
private final RaftServer server;
- private ThreadPoolExecutor writeChunkExecutor;
+ private ThreadPoolExecutor chunkExecutor;
private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir,
ContainerDispatcher dispatcher, Configuration conf) throws IOException {
@@ -117,13 +117,13 @@ public final class XceiverServerRatis implements XceiverServerSpi {
setRequestTimeout(serverProperties, clientRequestTimeout,
serverRequestTimeout);
- writeChunkExecutor =
+ chunkExecutor =
new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads,
100, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1024),
new ThreadPoolExecutor.CallerRunsPolicy());
ContainerStateMachine stateMachine =
- new ContainerStateMachine(dispatcher, writeChunkExecutor);
+ new ContainerStateMachine(dispatcher, chunkExecutor);
this.server = RaftServer.newBuilder()
.setServerId(RatisHelper.toRaftPeerId(dd))
.setGroup(RatisHelper.emptyRaftGroup())
@@ -225,14 +225,14 @@ public final class XceiverServerRatis implements XceiverServerSpi {
public void start() throws IOException {
LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
server.getId(), getIPCPort());
- writeChunkExecutor.prestartAllCoreThreads();
+ chunkExecutor.prestartAllCoreThreads();
server.start();
}
@Override
public void stop() {
try {
- writeChunkExecutor.shutdown();
+ chunkExecutor.shutdown();
server.close();
} catch (IOException e) {
throw new RuntimeException(e);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/900c0e11/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
index 2d04452..b8cfc97 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
@@ -88,7 +88,7 @@ public class OMMetrics {
public static OMMetrics create() {
MetricsSystem ms = DefaultMetricsSystem.instance();
return ms.register(SOURCE_NAME,
- "Oozne Manager Metrics",
+ "Ozone Manager Metrics",
new OMMetrics());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/900c0e11/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 387a3da..7603842 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -97,7 +97,7 @@
<ldap-api.version>1.0.0-M33</ldap-api.version>
<!-- Apache Ratis version -->
- <ratis.version>0.1.1-alpha-d7d7061-SNAPSHOT</ratis.version>
+ <ratis.version>0.3.0-c242317-SNAPSHOT</ratis.version>
<jcache.version>1.0-alpha-1</jcache.version>
<ehcache.version>3.3.1</ehcache.version>
<hikari.version>2.4.12</hikari.version>
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org