You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ms...@apache.org on 2018/02/05 16:37:01 UTC

incubator-ratis git commit: RATIS-206. Change StateMachine.query to pass and return Message. Contributed by Tsz Wo Nicholas Sze.

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 1beef8741 -> 2d5365683


RATIS-206. Change StateMachine.query to pass and return Message. Contributed by Tsz Wo Nicholas Sze.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/2d536568
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/2d536568
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/2d536568

Branch: refs/heads/master
Commit: 2d53656837d9d56792bc62a062dc1481aaab3024
Parents: 1beef87
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Mon Feb 5 22:06:36 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Mon Feb 5 22:06:36 2018 +0530

----------------------------------------------------------------------
 .../ratis/client/impl/ClientProtoUtils.java     | 16 +----------
 .../java/org/apache/ratis/protocol/Message.java | 26 +++++++++++++++++
 .../java/org/apache/ratis/util/StringUtils.java |  4 ++-
 .../arithmetic/ArithmeticStateMachine.java      | 15 +++-------
 .../examples/arithmetic/AssignmentMessage.java  |  4 +--
 .../arithmetic/expression/Expression.java       | 10 +++----
 .../examples/filestore/FileStoreClient.java     |  2 +-
 .../filestore/FileStoreStateMachine.java        | 16 +++++------
 .../ratis/server/impl/RaftServerImpl.java       |  3 +-
 .../apache/ratis/statemachine/StateMachine.java |  4 +--
 .../statemachine/impl/BaseStateMachine.java     |  8 ++----
 .../org/apache/ratis/RaftExceptionBaseTest.java |  4 +--
 .../SimpleStateMachine4Testing.java             | 30 ++++++++++++++------
 13 files changed, 78 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index a7aaf54..861dba6 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -209,21 +209,7 @@ public interface ClientProtoUtils {
   }
 
   static Message toMessage(final ClientMessageEntryProto p) {
-    return toMessage(p.getContent());
-  }
-
-  static Message toMessage(final ByteString bytes) {
-    return new Message() {
-      @Override
-      public ByteString getContent() {
-        return bytes;
-      }
-
-      @Override
-      public String toString() {
-        return StringUtils.bytes2HexShortString(getContent());
-      }
-    };
+    return Message.valueOf(p.getContent());
   }
 
   static ClientMessageEntryProto.Builder toClientMessageEntryProtoBuilder(ByteString message) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
index 77ef267..4efd29c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
@@ -18,11 +18,37 @@
 package org.apache.ratis.protocol;
 
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.StringUtils;
+
+import java.util.function.Supplier;
 
 /**
  * The information clients append to the raft ring.
  */
 public interface Message {
+  static Message valueOf(ByteString bytes, Supplier<String> stringSupplier) {
+    return new Message() {
+      private final MemoizedSupplier<String> memoized = MemoizedSupplier.valueOf(stringSupplier);
+
+      @Override
+      public ByteString getContent() {
+        return bytes;
+      }
+
+      @Override
+      public String toString() {
+        return memoized.get();
+      }
+    };
+  }
+
+  static Message valueOf(ByteString bytes) {
+    return valueOf(bytes, () -> "Message:" + StringUtils.bytes2HexShortString(bytes));
+  }
+
+  Message EMPTY = valueOf(ByteString.EMPTY);
+
   /**
    * @return the content of the message
    */

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
index 0212a48..710a0a2 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
@@ -67,7 +67,9 @@ public class StringUtils {
 
   public static String bytes2HexShortString(ByteString bytes) {
     final int size = bytes.size();
-    if (size > 10) {
+    if (size == 0) {
+      return "<EMPTY>";
+    } else if (size > 10) {
       // return only the first 10 bytes
       return bytes2HexString(bytes.substring(0, 10)) + "...(size=" + size + ")";
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
index 0b08b89..fafab6e 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
@@ -20,8 +20,6 @@ package org.apache.ratis.examples.arithmetic;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.examples.arithmetic.expression.Expression;
 import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -135,19 +133,15 @@ public class ArithmeticStateMachine extends BaseStateMachine {
   }
 
   @Override
-  public CompletableFuture<RaftClientReply> query(
-      RaftClientRequest request) {
-    final Expression q = Expression.Utils.bytes2Expression(
-        request.getMessage().getContent().toByteArray(), 0);
+  public CompletableFuture<Message> query(Message request) {
+    final Expression q = Expression.Utils.bytes2Expression(request.getContent().toByteArray(), 0);
     final Double result;
     try(final AutoCloseableLock readLock = readLock()) {
       result = q.evaluate(variables);
     }
     final Expression r = Expression.Utils.double2Expression(result);
     LOG.debug("QUERY: {} = {}", q, r);
-    final RaftClientReply reply = new RaftClientReply(request,
-        Expression.Utils.toMessage(r));
-    return CompletableFuture.completedFuture(reply);
+    return CompletableFuture.completedFuture(Expression.Utils.toMessage(r));
   }
 
   @Override
@@ -158,8 +152,7 @@ public class ArithmeticStateMachine extends BaseStateMachine {
   @Override
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
     final LogEntryProto entry = trx.getLogEntry();
-    final AssignmentMessage assignment = new AssignmentMessage(
-        () -> entry.getSmLogEntry().getData());
+    final AssignmentMessage assignment = new AssignmentMessage(entry.getSmLogEntry().getData());
 
     final long index = entry.getIndex();
     final Double result;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/AssignmentMessage.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/AssignmentMessage.java b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/AssignmentMessage.java
index e4e7ca8..097e792 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/AssignmentMessage.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/AssignmentMessage.java
@@ -43,8 +43,8 @@ public class AssignmentMessage implements Message, Evaluable {
     expression = Expression.Utils.bytes2Expression(buf, offset + variable.length());
   }
 
-  public AssignmentMessage(Message message) {
-    this(message.getContent().toByteArray(), 0);
+  public AssignmentMessage(ByteString bytes) {
+    this(bytes.toByteArray(), 0);
   }
 
   public Variable getVariable() {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/expression/Expression.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/expression/Expression.java b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/expression/Expression.java
index 12818f7..f212ce6 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/expression/Expression.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/expression/Expression.java
@@ -46,12 +46,10 @@ public interface Expression extends Evaluable {
 
   class Utils {
     public static Message toMessage(final Expression e) {
-      return () -> {
-        final byte[] buf = new byte[e.length()];
-        final int length = e.toBytes(buf, 0);
-        Preconditions.assertTrue(length == buf.length);
-        return toByteString(buf);
-      };
+      final byte[] buf = new byte[e.length()];
+      final int length = e.toBytes(buf, 0);
+      Preconditions.assertTrue(length == buf.length);
+      return Message.valueOf(toByteString(buf), () -> "Message:" + e);
     }
 
     public static Expression double2Expression(Double d) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
index 380f315..de794c2 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
@@ -57,7 +57,7 @@ public class FileStoreClient implements Closeable {
   static ByteString send(
       ByteString request, CheckedFunction<Message, RaftClientReply, IOException> sendFunction)
       throws IOException {
-    final RaftClientReply reply = sendFunction.apply(() -> request);
+    final RaftClientReply reply = sendFunction.apply(Message.valueOf(request));
     final StateMachineException sme = reply.getStateMachineException();
     if (sme != null) {
       throw new IOException("Failed to send request " + request, sme);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
index 5ecb387..c1b41c6 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
@@ -20,7 +20,6 @@ package org.apache.ratis.examples.filestore;
 import org.apache.ratis.conf.ConfUtils;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.storage.RaftStorage;
@@ -72,17 +71,17 @@ public class FileStoreStateMachine extends BaseStateMachine {
   }
 
   @Override
-  public CompletableFuture<RaftClientReply> query(RaftClientRequest request) {
+  public CompletableFuture<Message> query(Message request) {
     final ReadRequestProto proto;
     try {
-      proto = ReadRequestProto.parseFrom(request.getMessage().getContent());
+      proto = ReadRequestProto.parseFrom(request.getContent());
     } catch (InvalidProtocolBufferException e) {
       return FileStoreCommon.completeExceptionally("Failed to parse " + request, e);
     }
 
     final String path = proto.getPath().toStringUtf8();
     return files.read(path, proto.getOffset(), proto.getLength())
-        .thenApply(reply -> new RaftClientReply(request, () -> reply.toByteString()));
+        .thenApply(reply -> Message.valueOf(reply.toByteString()));
   }
 
   @Override
@@ -162,13 +161,14 @@ public class FileStoreStateMachine extends BaseStateMachine {
       long index, WriteRequestHeaderProto header, int size) {
     final String path = header.getPath().toStringUtf8();
     return files.submitCommit(index, path, header.getClose(), header.getOffset(), size)
-        .thenApply(reply -> () -> reply.toByteString());
+        .thenApply(reply -> Message.valueOf(reply.toByteString()));
   }
 
   private CompletableFuture<Message> delete(long index, DeleteRequestProto request) {
     final String path = request.getPath().toStringUtf8();
-    return files.delete(index, path).thenApply(resolved -> () ->
-        DeleteReplyProto.newBuilder().setResolvedPath(
-            FileStoreCommon.toByteString(resolved)).build().toByteString());
+    return files.delete(index, path).thenApply(resolved ->
+        Message.valueOf(DeleteReplyProto.newBuilder().setResolvedPath(
+            FileStoreCommon.toByteString(resolved)).build().toByteString(),
+            () -> "Message:" + resolved));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index a619909..6f8f603 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -462,7 +462,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     if (request.isReadOnly()) {
       // TODO: We might not be the leader anymore by the time this completes.
       // See the RAFT paper section 8 (last part)
-      return stateMachine.query(request);
+      return stateMachine.query(request.getMessage())
+          .thenApply(r -> new RaftClientReply(request, r));
     }
 
     // query the retry cache

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index cbedb08..e2aaf29 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -19,7 +19,6 @@ package org.apache.ratis.statemachine;
 
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
@@ -116,9 +115,8 @@ public interface StateMachine extends Closeable {
 
   /**
    * Query the state machine. The request must be read-only.
-   * TODO: extend RaftClientRequest to have a read-only request subclass.
    */
-  CompletableFuture<RaftClientReply> query(RaftClientRequest request);
+  CompletableFuture<Message> query(Message request);
 
   /**
    * Validate/pre-process the incoming update request in the state machine.

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index 8f0b56a..201eff7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -20,7 +20,6 @@ package org.apache.ratis.statemachine.impl;
 
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.impl.RaftConfiguration;
@@ -107,8 +106,8 @@ public class BaseStateMachine implements StateMachine {
   @Override
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
     // return the same message contained in the entry
-    Message msg = () -> trx.getLogEntry().getSmLogEntry().getData();
-    return CompletableFuture.completedFuture(msg);
+    return CompletableFuture.completedFuture(
+        Message.valueOf(trx.getLogEntry().getSmLogEntry().getData()));
   }
 
   @Override
@@ -159,8 +158,7 @@ public class BaseStateMachine implements StateMachine {
   }
 
   @Override
-  public CompletableFuture<RaftClientReply> query(
-      RaftClientRequest request) {
+  public CompletableFuture<Message> query(Message request) {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index 7ad5bd2..e69f35d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -175,11 +175,11 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
     // Create client using another group
     try(RaftClient client = cluster.createClient(anotherGroup)) {
       testFailureCase("send(..) with client group being different from the server group",
-          () -> client.send(() -> ByteString.EMPTY),
+          () -> client.send(Message.EMPTY),
           GroupMismatchException.class);
 
       testFailureCase("sendReadOnly(..) with client group being different from the server group",
-          () -> client.sendReadOnly(() -> ByteString.EMPTY),
+          () -> client.sendReadOnly(Message.EMPTY),
           GroupMismatchException.class);
 
       testFailureCase("setConfiguration(..) with client group being different from the server group",

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 86b7b66..261b9ef 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -21,7 +21,6 @@ import org.apache.ratis.RaftTestUtil.SimpleMessage;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.io.MD5Hash;
 import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
@@ -31,16 +30,14 @@ import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.LogInputStream;
 import org.apache.ratis.server.storage.LogOutputStream;
 import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
 import org.apache.ratis.statemachine.impl.TransactionContextImpl;
-import org.apache.ratis.util.Daemon;
-import org.apache.ratis.util.LifeCycle;
-import org.apache.ratis.util.MD5FileUtil;
-import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -216,11 +213,26 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
     }
   }
 
+  /**
+   * Query the n-th log entry.
+   * @param request an index represented in a UTF-8 String, or an empty message.
+   * @return a completed future of the n-th log entry,
+   *         where n is the last applied index if the request is empty,
+   *         otherwise, n is the index represented in the request.
+   */
   @Override
-  public CompletableFuture<RaftClientReply> query(
-      RaftClientRequest request) {
-    return CompletableFuture.completedFuture(
-        new RaftClientReply(request, new SimpleMessage("query success")));
+  public CompletableFuture<Message> query(Message request) {
+    final ByteString bytes = request.getContent();
+    try {
+      final long index = bytes.isEmpty()? getLastAppliedTermIndex().getIndex()
+          : Long.parseLong(bytes.toStringUtf8());
+      LOG.info("query log index " + index);
+      final LogEntryProto entry = list.get(Math.toIntExact(index));
+      return CompletableFuture.completedFuture(Message.valueOf(entry.toByteString()));
+    } catch (Exception e) {
+      LOG.warn("Failed request " + request, e);
+      return JavaUtils.completeExceptionally(e);
+    }
   }
 
   @Override