You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2020/02/19 09:11:08 UTC

[GitHub] [hadoop-ozone] elek commented on a change in pull request #542: HDDS-2974. Create Freon test to test isolated Ratis Follower

elek commented on a change in pull request #542: HDDS-2974. Create Freon test to test isolated Ratis Follower
URL: https://github.com/apache/hadoop-ozone/pull/542#discussion_r381159886
 
 

 ##########
 File path: hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/FollowerAppendLogEntryGenerator.java
 ##########
 @@ -0,0 +1,470 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.freon;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumData;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.DatanodeBlockID;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
+
+import com.codahale.metrics.Timer;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.concurrent.TimedSemaphore;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
+import org.apache.ratis.proto.RaftProtos.RaftGroupIdProto;
+import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
+import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
+import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.proto.RaftProtos.StateMachineEntryProto;
+import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
+import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto.Builder;
+import org.apache.ratis.proto.RaftProtos.TermIndexProto;
+import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc;
+import org.apache.ratis.proto.grpc.RaftServerProtocolServiceGrpc.RaftServerProtocolServiceStub;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
+import org.apache.ratis.thirdparty.io.grpc.netty.NegotiationType;
+import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder;
+import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Option;
+
+/**
+ * Freon test to test one single datanode with a fake leader (this test).
+ * <p>
+ * To use this test, start one datanode, but set
+ * OZONE_DATANODE_STANDALONE_TEST=follower first.
+ * <p>
+ * After that this test can be started. All the metadata/storage should be
+ * cleaned up before restarting the test.
+ */
+@Command(name = "falg",
+    aliases = "follower-append-log-generator",
+    description = "Generate append log entries to a follower server",
+    versionProvider = HddsVersionProvider.class,
+    mixinStandardHelpOptions = true,
+    showDefaultValues = true)
+public class FollowerAppendLogEntryGenerator extends BaseAppendLogGenerator
+    implements Callable<Void>, StreamObserver<AppendEntriesReplyProto> {
+
+  public static final String FAKE_LEADER_ADDDRESS = "localhost:1234";
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FollowerAppendLogEntryGenerator.class);
+
+  private static final String FAKE_LEADER_ID =
+      "ffffffff-df33-4a20-8e1f-ffffffff6be5";
+
+  @Option(names = {"-l", "--pipeline"},
+      description = "Pipeline to use. By default the first RATIS/THREE "
+          + "pipeline will be used.",
+      defaultValue = "96714307-4bd7-42b5-a65d-e1b13b4ca5c0")
+  private String pipelineId = "96714307-4bd7-42b5-a65d-e1b13b4ca5c0";
+
+  @Option(names = {"-s", "--size"},
+      description = "Size of the generated chunks (in bytes)",
+      defaultValue = "1024")
+  private int chunkSize;
+
+  @Option(names = {"-b", "--batching"},
+      description = "Number of write chunks requests in one AppendLogEntry",
+      defaultValue = "2")
+  private int batching;
+
+  @Option(names = {"-i", "--next-index"},
+      description = "The next index in the term 2 to continue a test. (If "
+          + "zero, a new ratis ring will be intialized with configureGroup "
+          + "call and vote)",
+      defaultValue = "0")
+  private long nextIndex;
+
+  @Option(names = {"--rate-limit"},
+      description = "Maximum number of requests per second (if bigger than 0)",
+      defaultValue = "0")
+  private int rateLimit;
+
+  @Option(names = {"--inflight-limit"},
+      description = "Maximum in-flight messages",
+      defaultValue = "10")
+  private int inflightLimit;
+
+  private TimedSemaphore rateLimiter;
+
+  private RaftPeerProto requestor;
+
+  private long term = 2L;
+
+  private RaftServerProtocolServiceStub stub;
+
+  private Random callIdRandom = new Random();
+
+  private ByteString dataToWrite;
+
+  private Timer timer;
+
+  private BlockingQueue<Long> inFlightMessages;
+
+  private StreamObserver<AppendEntriesRequestProto> sender;
+
+  @Override
+  public Void call() throws Exception {
+    inFlightMessages = new LinkedBlockingQueue<>(inflightLimit);
+
+    timer = getMetrics().timer("append-entry");
+    byte[] data = RandomStringUtils.randomAscii(chunkSize)
+        .getBytes(StandardCharsets.UTF_8);
+
+    dataToWrite = ByteString.copyFrom(data);
+
+    OzoneConfiguration conf = createOzoneConfiguration();
+
+    setServerIdFromFile(conf);
+
+    Preconditions.assertTrue(getThreadNo() == 1,
+        "This test should be executed from one thread");
+
+    //the raft identifier which is used by the freon
+    requestor = RaftPeerProto.newBuilder()
+        .setId(RaftPeerId.valueOf(FAKE_LEADER_ID).toByteString())
+        .setAddress(FAKE_LEADER_ADDDRESS)
+        .build();
+
+    NettyChannelBuilder channelBuilder =
+        NettyChannelBuilder.forTarget(serverAddress);
+    channelBuilder.negotiationType(NegotiationType.PLAINTEXT);
+    ManagedChannel build = channelBuilder.build();
+    stub = RaftServerProtocolServiceGrpc.newStub(build);
+
+    if (rateLimit != 0) {
+      rateLimiter = new TimedSemaphore(1, TimeUnit.SECONDS, rateLimit);
+    }
+
+    init();
+
+    sender = stub.appendEntries(this);
+
+    if (nextIndex == 0) {
+      //first: configure a new ratis group (one follower, one fake leader
+      // (freon))
+      configureGroup();
+
+      RequestVoteReplyProto vote = requestVote().get(1000, TimeUnit.SECONDS);
+      LOG.info("Datanode answered to the vote request: {}", vote);
+      if (!vote.getServerReply().getSuccess()) {
+        throw new RuntimeException(
+            "Datanode didn't vote to the fake freon leader.");
+      }
+
+      //send the first appendEntry. This one is special as it initialized the
+      // log.
+      long callId = callIdRandom.nextLong();
+      inFlightMessages.put(callId);
+      sender.onNext(createInitialLogEntry(callId));
+
+      nextIndex = 1L;
+    }
+
+    //We can generate as mach entry as we need.
+    runTests(this::sendAppendLogEntryRequest);
+
+    if (rateLimiter != null) {
+      rateLimiter.shutdown();
+    }
+
+    return null;
+  }
+
+  /**
+   * Seend a new HB and record the call id to handle response.
+   */
+  private void sendAppendLogEntryRequest(long sequence) {
+    timer.time(() -> {
+      AppendEntriesReplyProto replyProto = null;
+      try {
+        long callId = callIdRandom.nextLong();
+        inFlightMessages.put(callId);
+        sender.onNext(createAppendLogEntry(sequence, callId));
 
 Review comment:
   Earlier I used sync call and I stored the *reply*. Result of `createAppendLogEntry` is the request and we have no *reply* here as the same observer (`sender`) is used for all the requests.
   
   I removed the old, unused code (logging just the exception)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-issues-help@hadoop.apache.org