You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2011/03/08 02:01:18 UTC
svn commit: r1079060 - in /avro/trunk: ./
lang/java/ipc/src/main/java/org/apache/avro/ipc/
lang/java/ipc/src/test/java/org/apache/avro/
Author: cutting
Date: Tue Mar 8 01:01:17 2011
New Revision: 1079060
URL: http://svn.apache.org/viewvc?rev=1079060&view=rev
Log:
AVRO-761. Java: Fix Requestor to not send client's protocol redundantly.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/RPCContext.java
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolHttp.java
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolSpecific.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1079060&r1=1079059&r2=1079060&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Tue Mar 8 01:01:17 2011
@@ -300,6 +300,10 @@ Avro 1.5.0 (4 March 2011)
AVRO-775. Java: Fix a file handle leak in DataFileReader. (cutting)
+ AVRO-761. Java: Fix Requestor to not send client's protocol on
+ each handshake with stateless (HTTP) transport when protocol
+ differs from server's. (cutting)
+
Avro 1.4.1 (13 October 2010)
NEW FEATURES
Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/RPCContext.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/RPCContext.java?rev=1079060&r1=1079059&r2=1079060&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/RPCContext.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/RPCContext.java Tue Mar 8 01:01:17 2011
@@ -35,7 +35,9 @@ import org.apache.avro.Protocol.Message;
*/
public class RPCContext {
- protected Map<CharSequence,ByteBuffer> requestHandshakeMeta, responseHandshakeMeta;
+ private HandshakeRequest handshakeRequest;
+ private HandshakeResponse handshakeResponse;
+
protected Map<CharSequence,ByteBuffer> requestCallMeta, responseCallMeta;
protected Object response;
@@ -44,6 +46,26 @@ public class RPCContext {
List<ByteBuffer> requestPayload;
List<ByteBuffer> responsePayload;
+ /** Set the handshake request of this RPC. */
+ public void setHandshakeRequest(HandshakeRequest handshakeRequest) {
+ this.handshakeRequest = handshakeRequest;
+ }
+
+ /** Get the handshake request of this RPC. */
+ public HandshakeRequest getHandshakeRequest() {
+ return this.handshakeRequest;
+ }
+
+ /** Set the handshake response of this RPC. */
+ public void setHandshakeResponse(HandshakeResponse handshakeResponse) {
+ this.handshakeResponse = handshakeResponse;
+ }
+
+ /** Get the handshake response of this RPC. */
+ public HandshakeResponse getHandshakeResponse() {
+ return this.handshakeResponse;
+ }
+
/**
* This is an access method for the handshake state
* provided by the client to the server.
@@ -51,14 +73,13 @@ public class RPCContext {
* the client to the server
*/
public Map<CharSequence,ByteBuffer> requestHandshakeMeta() {
- if (requestHandshakeMeta == null) {
- requestHandshakeMeta = new HashMap<CharSequence,ByteBuffer>();
- }
- return requestHandshakeMeta;
+ if (handshakeRequest.meta == null)
+ handshakeRequest.meta = new HashMap<CharSequence,ByteBuffer>();
+ return handshakeRequest.meta;
}
void setRequestHandshakeMeta(Map<CharSequence,ByteBuffer> newmeta) {
- requestHandshakeMeta = newmeta;
+ handshakeRequest.meta = newmeta;
}
/**
@@ -68,14 +89,13 @@ public class RPCContext {
* the server to the client
*/
public Map<CharSequence,ByteBuffer> responseHandshakeMeta() {
- if (responseHandshakeMeta == null) {
- responseHandshakeMeta = new HashMap<CharSequence,ByteBuffer>();
- }
- return responseHandshakeMeta;
+ if (handshakeResponse.meta == null)
+ handshakeResponse.meta = new HashMap<CharSequence,ByteBuffer>();
+ return handshakeResponse.meta;
}
void setResponseHandshakeMeta(Map<CharSequence,ByteBuffer> newmeta) {
- responseHandshakeMeta = newmeta;
+ handshakeResponse.meta = newmeta;
}
/**
Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java?rev=1079060&r1=1079059&r2=1079060&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Requestor.java Tue Mar 8 01:01:17 2011
@@ -197,6 +197,7 @@ public abstract class Requestor {
handshake.clientProtocol = new Utf8(local.toString());
RPCContext context = new RPCContext();
+ context.setHandshakeRequest(handshake);
for (RPCPlugin plugin : rpcMetaPlugins) {
plugin.clientStartConnect(context);
}
@@ -212,11 +213,13 @@ public abstract class Requestor {
switch (handshake.match) {
case BOTH:
established = true;
+ sendLocalText = false;
break;
case CLIENT:
LOG.debug("Handshake match = CLIENT");
setRemote(handshake);
established = true;
+ sendLocalText = false;
break;
case NONE:
LOG.debug("Handshake match = NONE");
@@ -228,10 +231,7 @@ public abstract class Requestor {
}
RPCContext context = new RPCContext();
- if (handshake.meta != null) {
- context.setResponseHandshakeMeta((Map<CharSequence, ByteBuffer>) handshake.meta);
- }
-
+ context.setHandshakeResponse(handshake);
for (RPCPlugin plugin : rpcMetaPlugins) {
plugin.clientFinishConnect(context);
}
Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java?rev=1079060&r1=1079059&r2=1079060&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/Responder.java Tue Mar 8 01:01:17 2011
@@ -226,13 +226,11 @@ public abstract class Responder {
}
RPCContext context = new RPCContext();
- context.setRequestHandshakeMeta((Map<CharSequence, ByteBuffer>) request.meta);
-
+ context.setHandshakeRequest(request);
+ context.setHandshakeResponse(response);
for (RPCPlugin plugin : rpcMetaPlugins) {
plugin.serverConnecting(context);
}
- response.meta = context.responseHandshakeMeta();
-
handshakeWriter.write(response, out);
if (connection != null && response.match != HandshakeMatch.NONE)
Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolHttp.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolHttp.java?rev=1079060&r1=1079059&r2=1079060&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolHttp.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolHttp.java Tue Mar 8 01:01:17 2011
@@ -81,8 +81,12 @@ public class TestProtocolHttp extends Te
protocol.getMessages().put("ack", message);
// call a server over a stateless protocol that has a one-way "ack"
- new GenericRequestor(protocol, createTransceiver())
- .request("ack", new GenericData.Record(message.getRequest()));
+ GenericRequestor requestor =
+ new GenericRequestor(protocol, createTransceiver());
+ requestor.request("ack", new GenericData.Record(message.getRequest()));
+
+ // make the request again, to better test handshakes w/ differing protocols
+ requestor.request("ack", new GenericData.Record(message.getRequest()));
}
}
Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolSpecific.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolSpecific.java?rev=1079060&r1=1079059&r2=1079060&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolSpecific.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/TestProtocolSpecific.java Tue Mar 8 01:01:17 2011
@@ -54,6 +54,7 @@ import java.nio.ByteBuffer;
import java.util.Random;
import java.util.List;
import java.util.ArrayList;
+import java.util.HashSet;
public class TestProtocolSpecific {
@@ -263,6 +264,7 @@ public class TestProtocolSpecific {
public class HandshakeMonitor extends RPCPlugin{
private int handshakes;
+ private HashSet<CharSequence> seenProtocols = new HashSet<CharSequence>();
@Override
public void serverConnecting(RPCContext context) {
@@ -271,6 +273,14 @@ public class TestProtocolSpecific {
if(expected > 0 && handshakes > expected){
throw new IllegalStateException("Expected number of Protocol negotiation handshakes exceeded expected "+expected+" was "+handshakes);
}
+
+ // check that a given client protocol is only sent once
+ CharSequence clientProtocol =
+ context.getHandshakeRequest().clientProtocol;
+ if (clientProtocol != null) {
+ assertFalse(seenProtocols.contains(clientProtocol));
+ seenProtocols.add(clientProtocol);
+ }
}
public void assertHandshake(){