You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2019/03/05 22:05:26 UTC
[incubator-ratis] branch master updated: RATIS-493. PeerProxyMap
should throw AlreadyClosedException for CLOSING and CLOSED states.
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 59aa6f3 RATIS-493. PeerProxyMap should throw AlreadyClosedException for CLOSING and CLOSED states.
59aa6f3 is described below
commit 59aa6f3c82440d6b9af06690d6ddf2cb5922ea6e
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Mar 5 14:05:02 2019 -0800
RATIS-493. PeerProxyMap should throw AlreadyClosedException for CLOSING and CLOSED states.
---
.../org/apache/ratis/client/RaftClientRpc.java | 12 +++--
.../ratis/client/impl/RaftClientRpcWithProxy.java | 6 +--
.../main/java/org/apache/ratis/util/IOUtils.java | 58 +++++++++++++++-------
.../java/org/apache/ratis/util/PeerProxyMap.java | 8 ++-
.../java/org/apache/ratis/util/ProtoUtils.java | 18 +------
.../org/apache/ratis/server/impl/LogAppender.java | 2 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 2 +-
.../server/simulation/SimulatedClientRpc.java | 8 +--
8 files changed, 62 insertions(+), 52 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
index 505ae7e..8fb0987 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -39,6 +39,12 @@ public interface RaftClientRpc extends Closeable {
/** Add the information of the given raft servers */
void addServers(Iterable<RaftPeer> servers);
- /** Handle the given exception. For example, try reconnecting. */
- void handleException(RaftPeerId serverId, Exception e, boolean reconnect);
+ /**
+ * Handle the given throwable. For example, try reconnecting.
+ *
+ * @return true if the given throwable is handled; otherwise, the call is an no-op, return false.
+ */
+ default boolean handleException(RaftPeerId serverId, Throwable t, boolean reconnect) {
+ return false;
+ }
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
index 15fa061..b5fbf48 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientRpcWithProxy.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -43,8 +43,8 @@ public abstract class RaftClientRpcWithProxy<PROXY extends Closeable>
}
@Override
- public void handleException(RaftPeerId serverId, Exception e, boolean reconnect) {
- getProxies().handleException(serverId, e, reconnect);
+ public boolean handleException(RaftPeerId serverId, Throwable t, boolean reconnect) {
+ return getProxies().handleException(serverId, t, reconnect);
}
@Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
index 1f81170..f164eea 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
@@ -1,33 +1,33 @@
/*
- * *
- * * Licensed to the Apache Software Foundation (ASF) under one
- * * or more contributor license agreements. See the NOTICE file
- * * distributed with this work for additional information
- * * regarding copyright ownership. The ASF licenses this file
- * * to you under the Apache License, Version 2.0 (the
- * * "License"); you may not use this file except in compliance
- * * with the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
-
package org.apache.ratis.util;
+import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.TimeoutIOException;
import org.slf4j.Logger;
+import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
+import java.io.ObjectInputStream;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
@@ -88,9 +88,10 @@ public interface IOUtils {
}
}
- static boolean shouldReconnect(Exception e) {
+ static boolean shouldReconnect(Throwable e) {
return ReflectionUtils.isInstance(e,
- SocketException.class, SocketTimeoutException.class, ClosedChannelException.class, EOFException.class);
+ SocketException.class, SocketTimeoutException.class, ClosedChannelException.class, EOFException.class,
+ AlreadyClosedException.class);
}
static void readFully(InputStream in, int buffSize) throws IOException {
@@ -184,4 +185,23 @@ public interface IOUtils {
}
}
}
+
+ /** Serialize the given object to a byte array using {@link java.io.ObjectOutputStream#writeObject(Object)}. */
+ static byte[] object2Bytes(Object obj) {
+ return ProtoUtils.writeObject2ByteString(obj).toByteArray();
+ }
+
+ static <T> T bytes2Object(byte[] bytes, Class<T> clazz) {
+ return readObject(new ByteArrayInputStream(bytes), clazz);
+ }
+
+ static <T> T readObject(InputStream in, Class<T> clazz) {
+ try(ObjectInputStream oin = new ObjectInputStream(in)) {
+ return clazz.cast(oin.readObject());
+ } catch (IOException | ClassNotFoundException e) {
+ throw new IllegalStateException("Failed to read an object.", e);
+ } catch (ClassCastException e) {
+ throw new IllegalStateException("Failed to cast the object to " + clazz, e);
+ }
+ }
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
index c100d44..813fac7 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.util;
+import org.apache.ratis.protocol.AlreadyClosedException;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.util.function.CheckedFunction;
@@ -54,7 +55,7 @@ public class PeerProxyMap<PROXY extends Closeable> implements Closeable {
if (proxy == null) {
final LifeCycle.State current = lifeCycle.getCurrentState();
if (current.isOneOf(LifeCycle.State.CLOSING, LifeCycle.State.CLOSED)) {
- throw new IOException(name + " is already " + current);
+ throw new AlreadyClosedException(name + " is already " + current);
}
lifeCycle.startAndTransition(
() -> proxy = createProxy.apply(peer), IOException.class);
@@ -126,10 +127,13 @@ public class PeerProxyMap<PROXY extends Closeable> implements Closeable {
}
}
- public void handleException(RaftPeerId serverId, Exception e, boolean reconnect) {
+ /** @return true if the given throwable is handled; otherwise, the call is an no-op, return false. */
+ public boolean handleException(RaftPeerId serverId, Throwable e, boolean reconnect) {
if (reconnect || IOUtils.shouldReconnect(e)) {
resetProxy(serverId);
+ return true;
}
+ return false;
}
public PROXY createProxyImpl(RaftPeer peer) throws IOException {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 35ee012..b1bfdbe 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,7 +17,6 @@
*/
package org.apache.ratis.util;
-import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.RaftGroupIdProto;
import org.apache.ratis.proto.RaftProtos.RaftGroupProto;
@@ -33,7 +32,6 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.ServiceException;
import java.io.IOException;
-import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Collection;
import java.util.Iterator;
@@ -54,14 +52,7 @@ public interface ProtoUtils {
}
static Object toObject(ByteString bytes) {
- try(final ObjectInputStream in = new ObjectInputStream(bytes.newInput())) {
- return in.readObject();
- } catch (IOException e) {
- throw new IllegalStateException(
- "Unexpected IOException when reading an object from a ByteString.", e);
- } catch (ClassNotFoundException e) {
- throw new IllegalStateException(e);
- }
+ return IOUtils.readObject(bytes.newInput(), Object.class);
}
static ByteString toByteString(String string) {
@@ -167,9 +158,4 @@ public interface ProtoUtils {
static String toString(RequestVoteReplyProto proto) {
return toString(proto.getServerReply()) + "-t" + proto.getTerm();
}
- static String toString(AppendEntriesReplyProto proto) {
- return toString(proto.getServerReply()) + "-t" + proto.getTerm()
- + ", nextIndex=" + proto.getNextIndex()
- + ", result: " + proto.getResult();
- }
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index f9c8fc5..1892410 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -420,7 +420,7 @@ public class LogAppender {
if (nextIndex < oldNextIndex) {
throw new IllegalStateException("nextIndex=" + nextIndex
+ " < oldNextIndex=" + oldNextIndex
- + ", reply=" + ProtoUtils.toString(reply));
+ + ", reply=" + ServerProtoUtils.toString(reply));
}
if (nextIndex > oldNextIndex) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 494a596..a9ba7b2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -885,7 +885,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
leaderId, getId(), groupId, currentTerm, followerCommit, nextIndex, NOT_LEADER, callId);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Not recognize {} (term={}) as leader, state: {} reply: {}",
- getId(), leaderId, leaderTerm, state, ProtoUtils.toString(reply));
+ getId(), leaderId, leaderTerm, state, ServerProtoUtils.toString(reply));
}
return CompletableFuture.completedFuture(reply);
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
index c9c9f9b..ea47e03 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedClientRpc.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,7 +21,6 @@ import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
class SimulatedClientRpc
extends SimulatedRequestReply<RaftClientRequest, RaftClientReply>
@@ -36,11 +35,6 @@ class SimulatedClientRpc
}
@Override
- public void handleException(RaftPeerId serverId, Exception e, boolean reconnect) {
- // do nothing
- }
-
- @Override
public void close() {
// do nothing
}