You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2019/03/05 22:05:26 UTC

[incubator-ratis] branch master updated: RATIS-493. PeerProxyMap should throw AlreadyClosedException for CLOSING and CLOSED states.

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 59aa6f3  RATIS-493. PeerProxyMap should throw AlreadyClosedException for CLOSING and CLOSED states.
59aa6f3 is described below

commit 59aa6f3c82440d6b9af06690d6ddf2cb5922ea6e
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Mar 5 14:05:02 2019 -0800

    RATIS-493. PeerProxyMap should throw AlreadyClosedException for CLOSING and CLOSED states.
---
 .../org/apache/ratis/client/RaftClientRpc.java     | 12 +++--
 .../ratis/client/impl/RaftClientRpcWithProxy.java  |  6 +--
 .../main/java/org/apache/ratis/util/IOUtils.java   | 58 +++++++++++++++-------
 .../java/org/apache/ratis/util/PeerProxyMap.java   |  8 ++-
 .../java/org/apache/ratis/util/ProtoUtils.java     | 18 +------
 .../org/apache/ratis/server/impl/LogAppender.java  |  2 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   |  2 +-
 .../server/simulation/SimulatedClientRpc.java      |  8 +--
 8 files changed, 62 insertions(+), 52 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
index 505ae7e..8fb0987 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -39,6 +39,12 @@ public interface RaftClientRpc extends Closeable {
   /** Add the information of the given raft servers */
   void addServers(Iterable<RaftPeer> servers);
 
-  /** Handle the given exception.  For example, try reconnecting. */
-  void handleException(RaftPeerId serverId, Exception e, boolean reconnect);
+  /**
+   * Handle the given throwable.  For example, try reconnecting.
+   *
+   * @return true if the given throwable is handled; otherwise, the call is an no-op, return false.
+   */
+  default boolean handleException(RaftPeerId serverId, Throwable t, boolean reconnect) {
+    return false;
+  }
 }
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
index 15fa061..b5fbf48 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -43,8 +43,8 @@ public abstract class RaftClientRpcWithProxy<PROXY extends Closeable>
   }
 
   @Override
-  public void handleException(RaftPeerId serverId, Exception e, boolean reconnect) {
-    getProxies().handleException(serverId, e, reconnect);
+  public boolean handleException(RaftPeerId serverId, Throwable t, boolean reconnect) {
+    return getProxies().handleException(serverId, t, reconnect);
   }
 
   @Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
index 1f81170..f164eea 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
@@ -1,33 +1,33 @@
 /*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
-
 package org.apache.ratis.util;
 
+import org.apache.ratis.protocol.AlreadyClosedException;
 import org.apache.ratis.protocol.TimeoutIOException;
 import org.slf4j.Logger;
 
+import java.io.ByteArrayInputStream;
 import java.io.Closeable;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InterruptedIOException;
+import java.io.ObjectInputStream;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
@@ -88,9 +88,10 @@ public interface IOUtils {
     }
   }
 
-  static boolean shouldReconnect(Exception e) {
+  static boolean shouldReconnect(Throwable e) {
     return ReflectionUtils.isInstance(e,
-        SocketException.class, SocketTimeoutException.class, ClosedChannelException.class, EOFException.class);
+        SocketException.class, SocketTimeoutException.class, ClosedChannelException.class, EOFException.class,
+        AlreadyClosedException.class);
   }
 
   static void readFully(InputStream in, int buffSize) throws IOException {
@@ -184,4 +185,23 @@ public interface IOUtils {
       }
     }
   }
+
+  /** Serialize the given object to a byte array using {@link java.io.ObjectOutputStream#writeObject(Object)}. */
+  static byte[] object2Bytes(Object obj) {
+    return ProtoUtils.writeObject2ByteString(obj).toByteArray();
+  }
+
+  static <T> T bytes2Object(byte[] bytes, Class<T> clazz) {
+    return readObject(new ByteArrayInputStream(bytes), clazz);
+  }
+
+  static <T> T readObject(InputStream in, Class<T> clazz) {
+    try(ObjectInputStream oin = new ObjectInputStream(in)) {
+      return clazz.cast(oin.readObject());
+    } catch (IOException | ClassNotFoundException e) {
+      throw new IllegalStateException("Failed to read an object.", e);
+    } catch (ClassCastException e) {
+      throw new IllegalStateException("Failed to cast the object to " + clazz, e);
+    }
+  }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
index c100d44..813fac7 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.util;
 
+import org.apache.ratis.protocol.AlreadyClosedException;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.util.function.CheckedFunction;
@@ -54,7 +55,7 @@ public class PeerProxyMap<PROXY extends Closeable> implements Closeable {
           if (proxy == null) {
             final LifeCycle.State current = lifeCycle.getCurrentState();
             if (current.isOneOf(LifeCycle.State.CLOSING, LifeCycle.State.CLOSED)) {
-              throw new IOException(name + " is already " + current);
+              throw new AlreadyClosedException(name + " is already " + current);
             }
             lifeCycle.startAndTransition(
                 () -> proxy = createProxy.apply(peer), IOException.class);
@@ -126,10 +127,13 @@ public class PeerProxyMap<PROXY extends Closeable> implements Closeable {
     }
   }
 
-  public void handleException(RaftPeerId serverId, Exception e, boolean reconnect) {
+  /** @return true if the given throwable is handled; otherwise, the call is an no-op, return false. */
+  public boolean handleException(RaftPeerId serverId, Throwable e, boolean reconnect) {
     if (reconnect || IOUtils.shouldReconnect(e)) {
       resetProxy(serverId);
+      return true;
     }
+    return false;
   }
 
   public PROXY createProxyImpl(RaftPeer peer) throws IOException {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 35ee012..b1bfdbe 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,7 +17,6 @@
  */
 package org.apache.ratis.util;
 
-import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.proto.RaftProtos.RaftGroupIdProto;
 import org.apache.ratis.proto.RaftProtos.RaftGroupProto;
@@ -33,7 +32,6 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.thirdparty.com.google.protobuf.ServiceException;
 
 import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.Collection;
 import java.util.Iterator;
@@ -54,14 +52,7 @@ public interface ProtoUtils {
   }
 
   static Object toObject(ByteString bytes) {
-    try(final ObjectInputStream in = new ObjectInputStream(bytes.newInput())) {
-      return in.readObject();
-    } catch (IOException e) {
-      throw new IllegalStateException(
-          "Unexpected IOException when reading an object from a ByteString.", e);
-    } catch (ClassNotFoundException e) {
-      throw new IllegalStateException(e);
-    }
+    return IOUtils.readObject(bytes.newInput(), Object.class);
   }
 
   static ByteString toByteString(String string) {
@@ -167,9 +158,4 @@ public interface ProtoUtils {
   static String toString(RequestVoteReplyProto proto) {
     return toString(proto.getServerReply()) + "-t" + proto.getTerm();
   }
-  static String toString(AppendEntriesReplyProto proto) {
-    return toString(proto.getServerReply()) + "-t" + proto.getTerm()
-        + ", nextIndex=" + proto.getNextIndex()
-        + ", result: " + proto.getResult();
-  }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index f9c8fc5..1892410 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -420,7 +420,7 @@ public class LogAppender {
           if (nextIndex < oldNextIndex) {
             throw new IllegalStateException("nextIndex=" + nextIndex
                 + " < oldNextIndex=" + oldNextIndex
-                + ", reply=" + ProtoUtils.toString(reply));
+                + ", reply=" + ServerProtoUtils.toString(reply));
           }
 
           if (nextIndex > oldNextIndex) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 494a596..a9ba7b2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -885,7 +885,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
             leaderId, getId(), groupId, currentTerm, followerCommit, nextIndex, NOT_LEADER, callId);
         if (LOG.isDebugEnabled()) {
           LOG.debug("{}: Not recognize {} (term={}) as leader, state: {} reply: {}",
-              getId(), leaderId, leaderTerm, state, ProtoUtils.toString(reply));
+              getId(), leaderId, leaderTerm, state, ServerProtoUtils.toString(reply));
         }
         return CompletableFuture.completedFuture(reply);
       }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
index c9c9f9b..ea47e03 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,7 +21,6 @@ import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
 
 class SimulatedClientRpc
     extends SimulatedRequestReply<RaftClientRequest, RaftClientReply>
@@ -36,11 +35,6 @@ class SimulatedClientRpc
   }
 
   @Override
-  public void handleException(RaftPeerId serverId, Exception e, boolean reconnect) {
-    // do nothing
-  }
-
-  @Override
   public void close() {
     // do nothing
   }