You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tuscany.apache.org by wj...@apache.org on 2009/04/13 11:41:01 UTC

svn commit: r764382 - in /tuscany/branches/sca-java-1.x/modules/binding-erlang-runtime/src: main/java/org/apache/tuscany/sca/binding/erlang/impl/ test/java/org/apache/tuscany/sca/binding/erlang/testing/

Author: wjaniszewski
Date: Mon Apr 13 09:41:00 2009
New Revision: 764382

URL: http://svn.apache.org/viewvc?rev=764382&view=rev
Log:
General improvements for messaging, fixed some issues regarding communication with real Erlang nodes

Modified:
    tuscany/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java
    tuscany/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java
    tuscany/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/MboxListener.java
    tuscany/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/ReferenceServiceTestCase.java

Modified: tuscany/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java
URL: http://svn.apache.org/viewvc/tuscany/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java?rev=764382&r1=764381&r2=764382&view=diff
==============================================================================
--- tuscany/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java (original)
+++ tuscany/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ErlangInvoker.java Mon Apr 13 09:41:00 2009
@@ -81,9 +81,12 @@
 			}
 			tmpMbox = node.createMbox();
 			Object[] args = msg.getBody();
-			OtpErlangObject msgPayload = TypeHelpersProxy.toErlang(args);
+			// create and send msg with self pid in the beginning
+			OtpErlangObject[] argsArray = { tmpMbox.self(),
+					TypeHelpersProxy.toErlang(args) };
+			OtpErlangObject otpArgs = new OtpErlangTuple(argsArray);
 			tmpMbox.send(msg.getOperation().getName(), binding.getNode(),
-					msgPayload);
+					otpArgs);
 			if (msg.getOperation().getOutputType() != null) {
 				OtpMsg resultMsg = null;
 				if (binding.hasTimeout()) {
@@ -149,12 +152,8 @@
 				reportProblem(msg, e);
 				msg.setBody(null);
 			} else if (msg.getOperation().getOutputType() != null) {
-				if (result.getClass().equals(OtpErlangTuple.class)) {
-					OtpErlangObject resultBody = ((OtpErlangTuple) result)
-							.elementAt(1);
-					msg.setBody(TypeHelpersProxy.toJava(resultBody, msg
-							.getOperation().getOutputType().getPhysical()));
-				}
+				msg.setBody(TypeHelpersProxy.toJava(result, msg.getOperation()
+						.getOutputType().getPhysical()));
 			}
 		} catch (OtpAuthException e) {
 			// TODO: externalize message?

Modified: tuscany/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java
URL: http://svn.apache.org/viewvc/tuscany/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java?rev=764382&r1=764381&r2=764382&view=diff
==============================================================================
--- tuscany/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java (original)
+++ tuscany/branches/sca-java-1.x/modules/binding-erlang-runtime/src/main/java/org/apache/tuscany/sca/binding/erlang/impl/ServiceExecutor.java Mon Apr 13 09:41:00 2009
@@ -43,9 +43,7 @@
 import com.ericsson.otp.erlang.OtpErlangRef;
 import com.ericsson.otp.erlang.OtpErlangString;
 import com.ericsson.otp.erlang.OtpErlangTuple;
-import com.ericsson.otp.erlang.OtpMbox;
 import com.ericsson.otp.erlang.OtpMsg;
-import com.ericsson.otp.erlang.OtpNode;
 
 /**
  * @version $Rev$ $Date$
@@ -72,17 +70,18 @@
 	private void sendMessage(OtpConnection connection, OtpErlangPid pid,
 			OtpErlangRef ref, OtpErlangAtom head, OtpErlangObject message)
 			throws IOException {
-		OtpErlangTuple tResult = new OtpErlangTuple(new OtpErlangObject[] {
-				head, message });
+		OtpErlangObject tResult = null;
+		if (head != null) {
+			tResult = new OtpErlangTuple(
+					new OtpErlangObject[] { head, message });
+		} else {
+			tResult = message;
+		}
 		OtpErlangObject msg = null;
 		msg = new OtpErlangTuple(new OtpErlangObject[] { ref, tResult });
 		connection.send(pid, msg);
 	}
 
-	private String getResponseClientNodeName(OtpErlangPid pid) {
-		return "_response_connector_to_" + pid + System.currentTimeMillis();
-	}
-
 	private void handleRpc(OtpMsg msg) {
 		OtpErlangTuple request = null;
 		OtpErlangPid senderPid = null;
@@ -148,8 +147,8 @@
 							Object[] arrArg = new Object[] { result };
 							response = TypeHelpersProxy.toErlang(arrArg);
 						}
-						sendMessage(connection, senderPid, senderRef,
-								MessageHelper.ATOM_OK, response);
+						sendMessage(connection, senderPid, senderRef, null,
+								response);
 					} catch (Exception e) {
 						if ((e.getClass().equals(
 								InvocationTargetException.class) && e
@@ -207,8 +206,24 @@
 	private void handleMsg(OtpMsg msg) {
 		Operation matchedOperation = null;
 		Object args[] = null;
+		OtpErlangPid senderPid = null;
+		OtpErlangObject msgNoSender = null;
 		List<Operation> operations = groupedOperations.get(msg
 				.getRecipientName());
+		try {
+			if (msg.getMsg().getClass().equals(OtpErlangTuple.class)
+					&& (((OtpErlangTuple) msg.getMsg()).elementAt(0))
+							.getClass().equals(OtpErlangPid.class)) {
+				senderPid = (OtpErlangPid) ((OtpErlangTuple) msg.getMsg())
+						.elementAt(0);
+				msgNoSender = ((OtpErlangTuple) msg.getMsg()).elementAt(1);
+			} else {
+				msgNoSender = msg.getMsg();
+			}
+		} catch (Exception e) {
+
+		}
+
 		if (operations == null) {
 			// TODO: externalize message?
 			// NOTE: I assume in Erlang sender doesn't get confirmation so
@@ -224,7 +239,7 @@
 					forClasses[i] = iTypes.get(i).getPhysical();
 				}
 				try {
-					args = TypeHelpersProxy.toJavaAsArgs(msg.getMsg(),
+					args = TypeHelpersProxy.toJavaAsArgs(msgNoSender,
 							forClasses);
 					matchedOperation = operation;
 					break;
@@ -247,11 +262,12 @@
 						Object[] arrArg = new Object[] { result };
 						response = TypeHelpersProxy.toErlang(arrArg);
 					}
-					if (response != null) {
-						OtpNode node = new OtpNode(
-								getResponseClientNodeName(msg.getSenderPid()));
-						OtpMbox mbox = node.createMbox();
-						mbox.send(msg.getSenderPid(), response);
+					if (response != null && senderPid != null) {
+						connection.send(senderPid, response);
+					} else if (response != null && senderPid == null) {
+						// FIXME: cannot send reply - sender didn't provided
+						// pid. Use PID obtained by jinteface or log this error?
+						// connection.send(msg.getSenderPid(), response);
 					}
 				} catch (InvocationTargetException e) {
 					// FIXME: use linking feature? send some error?
@@ -262,17 +278,11 @@
 					e.printStackTrace();
 				}
 			} else {
-				try {
-					// TODO: externalize message?
-					// NOTE: don't send error message if mapping not found
-					logger.log(Level.WARNING,
-							"No mapping for such arguments in '"
-									+ msg.getRecipientName()
-									+ "' operation in '" + name
-									+ "' node. Recevied arguments: "
-									+ msg.getMsg());
-				} catch (OtpErlangDecodeException e) {
-				}
+				// TODO: externalize message?
+				// NOTE: don't send error message if mapping not found
+				logger.log(Level.WARNING, "No mapping for such arguments in '"
+						+ msg.getRecipientName() + "' operation in '" + name
+						+ "' node. Recevied arguments: " + msgNoSender);
 			}
 		}
 	}

Modified: tuscany/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/MboxListener.java
URL: http://svn.apache.org/viewvc/tuscany/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/MboxListener.java?rev=764382&r1=764381&r2=764382&view=diff
==============================================================================
--- tuscany/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/MboxListener.java (original)
+++ tuscany/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/MboxListener.java Mon Apr 13 09:41:00 2009
@@ -21,6 +21,8 @@
 
 import org.apache.tuscany.sca.binding.erlang.impl.types.TypeHelpersProxy;
 
+import com.ericsson.otp.erlang.OtpErlangObject;
+import com.ericsson.otp.erlang.OtpErlangTuple;
 import com.ericsson.otp.erlang.OtpMbox;
 import com.ericsson.otp.erlang.OtpMsg;
 
@@ -58,22 +60,27 @@
 		}
 	}
 
-	public OtpMsg getMsg() {
-		// Sometimes clients tries to get message which isn't fully received.
-		// If so - give it more tries. This sometimes caused
-		// NullPointerException in
-		// ReferenceServiceTestCase.testMultipleArguments().
-		for (int i = 0; i < 3; i++) {
-			if (msg != null) {
-				return msg;
-			} else {
-				try {
-					Thread.sleep(100);
-				} catch (InterruptedException e) {
+	public OtpErlangObject getMsg() {
+		try {
+			// Sometimes clients tries to get message which isn't fully
+			// received.
+			// If so - give it more tries. This sometimes caused
+			// NullPointerException in
+			// ReferenceServiceTestCase.testMultipleArguments().
+			for (int i = 0; i < 3; i++) {
+				if (msg != null) {
+					return ((OtpErlangTuple) msg.getMsg()).elementAt(1);
+				} else {
+					try {
+						Thread.sleep(100);
+					} catch (InterruptedException e) {
+					}
 				}
 			}
+			return msg.getMsg();
+		} catch (Exception e) {
+
 		}
-		return msg;
+		return null;
 	}
-
 }

Modified: tuscany/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/ReferenceServiceTestCase.java
URL: http://svn.apache.org/viewvc/tuscany/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/ReferenceServiceTestCase.java?rev=764382&r1=764381&r2=764382&view=diff
==============================================================================
--- tuscany/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/ReferenceServiceTestCase.java (original)
+++ tuscany/branches/sca-java-1.x/modules/binding-erlang-runtime/src/test/java/org/apache/tuscany/sca/binding/erlang/testing/ReferenceServiceTestCase.java Mon Apr 13 09:41:00 2009
@@ -130,7 +130,7 @@
 		Thread mboxThread = new Thread(mboxListener);
 		mboxThread.start();
 		String testResult = mboxReference.sendArgs(strArg);
-		assertEquals(strArg, ((OtpErlangString) mboxListener.getMsg().getMsg())
+		assertEquals(strArg, ((OtpErlangString) mboxListener.getMsg())
 				.stringValue());
 		assertEquals(strResult, testResult);
 	}
@@ -148,8 +148,8 @@
 		Thread mboxThread = new Thread(mboxListener);
 		mboxThread.start();
 		boolean testResult = mboxReference.sendArgs(booleanArg);
-		assertEquals(booleanArg, ((OtpErlangAtom) mboxListener.getMsg()
-				.getMsg()).booleanValue());
+		assertEquals(booleanArg, ((OtpErlangAtom) mboxListener.getMsg())
+				.booleanValue());
 		assertEquals(booleanResult, testResult);
 	}
 
@@ -166,8 +166,8 @@
 		Thread mboxThread = new Thread(mboxListener);
 		mboxThread.start();
 		float testResult = mboxReference.sendArgs(floatArg);
-		assertEquals(floatArg, ((OtpErlangDouble) mboxListener.getMsg()
-				.getMsg()).doubleValue(), 0);
+		assertEquals(floatArg, ((OtpErlangDouble) mboxListener.getMsg())
+				.doubleValue(), 0);
 		assertEquals(floatResult, testResult, 0);
 	}
 
@@ -184,8 +184,8 @@
 		Thread mboxThread = new Thread(mboxListener);
 		mboxThread.start();
 		double testResult = mboxReference.sendArgs(doubleArg);
-		assertEquals(doubleArg, ((OtpErlangDouble) mboxListener.getMsg()
-				.getMsg()).doubleValue(), 0);
+		assertEquals(doubleArg, ((OtpErlangDouble) mboxListener.getMsg())
+				.doubleValue(), 0);
 		assertEquals(doubleResult, testResult, 0);
 	}
 
@@ -202,7 +202,7 @@
 		Thread mboxThread = new Thread(mboxListener);
 		mboxThread.start();
 		long testResult = mboxReference.sendArgs(longArg);
-		assertEquals(longArg, ((OtpErlangLong) mboxListener.getMsg().getMsg())
+		assertEquals(longArg, ((OtpErlangLong) mboxListener.getMsg())
 				.longValue(), 0);
 		assertEquals(longResult, testResult, 0);
 	}
@@ -220,8 +220,8 @@
 		Thread mboxThread = new Thread(mboxListener);
 		mboxThread.start();
 		int testResult = mboxReference.sendArgs(intArg);
-		assertEquals(intArg, ((OtpErlangLong) mboxListener.getMsg().getMsg())
-				.intValue(), 0);
+		assertEquals(intArg,
+				((OtpErlangLong) mboxListener.getMsg()).intValue(), 0);
 		assertEquals(intResult, testResult, 0);
 	}
 
@@ -238,7 +238,7 @@
 		Thread mboxThread = new Thread(mboxListener);
 		mboxThread.start();
 		char testResult = mboxReference.sendArgs(charArg);
-		assertEquals(charArg, ((OtpErlangLong) mboxListener.getMsg().getMsg())
+		assertEquals(charArg, ((OtpErlangLong) mboxListener.getMsg())
 				.charValue(), 0);
 		assertEquals(charResult, testResult, 0);
 	}
@@ -256,7 +256,7 @@
 		Thread mboxThread = new Thread(mboxListener);
 		mboxThread.start();
 		short testResult = mboxReference.sendArgs(shortArg);
-		assertEquals(shortArg, ((OtpErlangLong) mboxListener.getMsg().getMsg())
+		assertEquals(shortArg, ((OtpErlangLong) mboxListener.getMsg())
 				.shortValue(), 0);
 		assertEquals(shortResult, testResult, 0);
 	}
@@ -274,7 +274,7 @@
 		Thread mboxThread = new Thread(mboxListener);
 		mboxThread.start();
 		byte testResult = mboxReference.sendArgs(byteArg);
-		assertEquals(byteArg, ((OtpErlangLong) mboxListener.getMsg().getMsg())
+		assertEquals(byteArg, ((OtpErlangLong) mboxListener.getMsg())
 				.byteValue(), 0);
 		assertEquals(byteResult, testResult, 0);
 	}
@@ -293,10 +293,10 @@
 		int testInt = 10;
 		mboxReference.sendArgs(testInt, testString);
 		assertEquals(testInt, ((OtpErlangLong) ((OtpErlangTuple) mboxListener
-				.getMsg().getMsg()).elementAt(0)).longValue());
+				.getMsg()).elementAt(0)).longValue());
 		assertEquals(testString,
-				((OtpErlangString) ((OtpErlangTuple) mboxListener.getMsg()
-						.getMsg()).elementAt(1)).stringValue());
+				((OtpErlangString) ((OtpErlangTuple) mboxListener.getMsg())
+						.elementAt(1)).stringValue());
 	}
 
 	/**
@@ -321,8 +321,7 @@
 		testArg.arg1.arg2 = "Arg2b";
 		StructuredTuple testResult = mboxReference.sendArgs(testArg);
 		assertEquals(tupleResult, testResult);
-		OtpErlangTuple received = (OtpErlangTuple) mboxListener.getMsg()
-				.getMsg();
+		OtpErlangTuple received = (OtpErlangTuple) mboxListener.getMsg();
 		assertEquals(testArg.arg1.arg1,
 				((OtpErlangLong) ((OtpErlangTuple) received.elementAt(0))
 						.elementAt(0)).longValue());
@@ -351,8 +350,7 @@
 		for (int i = 0; i < testArg.length; i++) {
 			assertEquals(testArg[i], testResult[i]);
 		}
-		OtpErlangBinary received = (OtpErlangBinary) mboxListener.getMsg()
-				.getMsg();
+		OtpErlangBinary received = (OtpErlangBinary) mboxListener.getMsg();
 		assertEquals(testArg.length, received.size());
 		for (int i = 0; i < testArg.length; i++) {
 			assertEquals(testArg[i], received.binaryValue()[i]);
@@ -375,7 +373,7 @@
 		for (int i = 0; i < testArg.length; i++) {
 			assertEquals(testArg[i], testResult[i]);
 		}
-		OtpErlangList received = (OtpErlangList) mboxListener.getMsg().getMsg();
+		OtpErlangList received = (OtpErlangList) mboxListener.getMsg();
 		assertEquals(testArg.length, received.arity());
 		for (int i = 0; i < testArg.length; i++) {
 			assertEquals(testArg[i], ((OtpErlangString) received.elementAt(i))
@@ -402,7 +400,7 @@
 				assertEquals(testArg[i][j], testResult[i][j]);
 			}
 		}
-		OtpErlangList received = (OtpErlangList) mboxListener.getMsg().getMsg();
+		OtpErlangList received = (OtpErlangList) mboxListener.getMsg();
 		assertEquals(testArg.length, received.arity());
 		for (int i = 0; i < testArg.length; i++) {
 			for (int j = 0; j < testArg[i].length; j++) {
@@ -631,19 +629,22 @@
 			assertEquals(ErlangException.class, e.getClass());
 		}
 	}
-	
+
 	/**
 	 * Tests mbox with retrieving and answering with basic arguments
 	 * 
 	 * @throws Exception
 	 */
-	@Test(timeout = 1000)
+	@Test(timeout = 2000)
 	public void testMbox() throws Exception {
 		OtpErlangObject[] args = new OtpErlangObject[2];
 		args[0] = new OtpErlangString("world");
 		args[1] = new OtpErlangString("!");
 		OtpErlangTuple tuple = new OtpErlangTuple(args);
-		refMbox.send("sayHello", "RPCServerMbox", tuple);
+		OtpErlangObject[] argsWithSender = new OtpErlangObject[2];
+		argsWithSender[0] = refMbox.self();
+		argsWithSender[1] = tuple;
+		refMbox.send("sayHello", "RPCServerMbox", new OtpErlangTuple(argsWithSender));
 		OtpErlangString result = (OtpErlangString) refMbox.receiveMsg()
 				.getMsg();
 		assertEquals("Hello world !", result.stringValue());
@@ -679,7 +680,10 @@
 		argsContent[0] = structuredTuple;
 		argsContent[1] = list;
 		OtpErlangTuple args = new OtpErlangTuple(argsContent);
-		refMbox.send("passComplexArgs", "RPCServerMbox", args);
+		OtpErlangObject[] withSender = new OtpErlangObject[2];
+		withSender[0] = refMbox.self();
+		withSender[1] = args;
+		refMbox.send("passComplexArgs", "RPCServerMbox", new OtpErlangTuple(withSender));
 		OtpErlangObject result = refMbox.receiveMsg().getMsg();
 		assertEquals(arg1,
 				((OtpErlangLong) ((OtpErlangTuple) ((OtpErlangTuple) result)
@@ -766,6 +770,7 @@
 
 	/**
 	 * Tests timeout feature for service side bindings
+	 * 
 	 * @throws Exception
 	 */
 	@Test(timeout = 4000)
@@ -811,5 +816,5 @@
 		cookieModuleReference.sayHellos();
 
 	}
-	
+
 }