You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by as...@apache.org on 2008/12/30 21:07:47 UTC
svn commit: r730227 - in /synapse/trunk/java:
modules/samples/src/main/java/samples/mediators/
modules/samples/src/main/java/samples/userguide/
modules/samples/src/main/scripts/ repository/conf/sample/ src/site/xdoc/
Author: asanka
Date: Tue Dec 30 12:07:46 2008
New Revision: 730227
URL: http://svn.apache.org/viewvc?rev=730227&view=rev
Log:
Fixing SYNAPSE-427
Added:
synapse/trunk/java/modules/samples/src/main/java/samples/mediators/BinaryExtractMediator.java
synapse/trunk/java/modules/samples/src/main/java/samples/userguide/MDDConsumer.java
synapse/trunk/java/modules/samples/src/main/java/samples/userguide/MDDProducer.java
synapse/trunk/java/repository/conf/sample/synapse_sample_381.xml
Modified:
synapse/trunk/java/modules/samples/src/main/scripts/build.xml
synapse/trunk/java/src/site/xdoc/Synapse_Samples.xml
Added: synapse/trunk/java/modules/samples/src/main/java/samples/mediators/BinaryExtractMediator.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/src/main/java/samples/mediators/BinaryExtractMediator.java?rev=730227&view=auto
==============================================================================
--- synapse/trunk/java/modules/samples/src/main/java/samples/mediators/BinaryExtractMediator.java (added)
+++ synapse/trunk/java/modules/samples/src/main/java/samples/mediators/BinaryExtractMediator.java Tue Dec 30 12:07:46 2008
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samples.mediators;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.OMText;
+import org.apache.axiom.soap.SOAPBody;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.mediators.AbstractMediator;
+
+import javax.activation.DataHandler;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * This mediator analyze a binary message and convert binary to a given datatype and set is as a message property.
+ * User can use the message property for CBR.
+ * User has to set the offset : where to start
+ * length : how many bytes to read
+ * binaryEncodig : utf-8, utf-16, ASCII, Base64
+ * VariableName : property name set with the decoded value in the message context
+ * These values should set as parameters from the synapse configuration layer.
+ */
+public class BinaryExtractMediator extends AbstractMediator {
+
+ private static final Log log = LogFactory.getLog( BinaryExtractMediator.class);
+ private static final String PROP_NAME = "SearchKey";
+
+ private int length=1;
+ private int offset=1;
+ private int dataType=1; // Not using this is supporting only char[]/String data types
+ private String binaryEncoding="utf-8";
+ private String variableName= PROP_NAME;
+
+ public BinaryExtractMediator(){}
+
+ public boolean mediate(MessageContext msgCtx) {
+ try {
+ log.debug("BinaryExtractMediator Process, with offset: "+offset+" ,length "+length);
+ SOAPBody soapBody = msgCtx.getEnvelope().getBody();
+ OMElement firstElement = soapBody.getFirstElement();
+ log.debug("First Element : "+firstElement.getLocalName());
+ log.debug("First Element Text : "+firstElement.getText());
+ OMText binaryNode =(OMText) firstElement.getFirstOMChild();
+ log.debug("First Element Node Text : "+binaryNode.getText());
+ DataHandler dataHandler =(DataHandler) binaryNode.getDataHandler();
+ InputStream inputStream = dataHandler.getInputStream();
+ byte[] searchByte = new byte[length];
+ inputStream.skip(offset-1);
+ int readBytes = inputStream.read(searchByte,0,length);
+ String outString = new String(searchByte,binaryEncoding);
+ msgCtx.setProperty(variableName,outString);
+ log.debug("Set property to MsgCtx, "+variableName+" = "+outString);
+ inputStream.close();
+ } catch (IOException e) {
+ log.error("Excepton on mediation : "+e.getMessage());
+ }
+ return true;
+ }
+
+ public String getType() {
+ return null;
+ }
+
+ public void setTraceState(int traceState) {
+ traceState = traceState;
+ }
+
+ public int getTraceState() {
+ return traceState;
+ }
+
+ public int getLength() {
+ return length;
+ }
+
+ public void setLength(int length) {
+ this.length = length;
+ }
+
+ public int getDataType() {
+ return dataType;
+ }
+
+ public void setDataType(int dataType) {
+ this.dataType = dataType;
+ }
+
+ public int getOffset() {
+ return offset;
+ }
+
+ public void setOffset(int offset) {
+ this.offset = offset;
+ }
+
+ public String getBinaryEncoding() {
+ return binaryEncoding;
+ }
+
+ public void setBinaryEncoding(String binaryEncoding) {
+ this.binaryEncoding = binaryEncoding;
+ }
+
+ public String getVariableName() {
+ return variableName;
+ }
+
+ public void setVariableName(String variableName) {
+ this.variableName = variableName;
+ }
+}
Added: synapse/trunk/java/modules/samples/src/main/java/samples/userguide/MDDConsumer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/src/main/java/samples/userguide/MDDConsumer.java?rev=730227&view=auto
==============================================================================
--- synapse/trunk/java/modules/samples/src/main/java/samples/userguide/MDDConsumer.java (added)
+++ synapse/trunk/java/modules/samples/src/main/java/samples/userguide/MDDConsumer.java Tue Dec 30 12:07:46 2008
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samples.userguide;
+
+
+import javax.jms.*;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.util.Properties;
+
+
+public class MDDConsumer implements MessageListener {
+
+ private static String getProperty(String name, String def) {
+ String result = System.getProperty(name);
+ if (result == null || result.length() == 0) {
+ result = def;
+ }
+ return result;
+ }
+
+ public static void main(String[] args) throws Exception {
+ String dest = getProperty("jms_topic", "mdd.MSFT");
+ MDDConsumer app = new MDDConsumer();
+ app.run(dest);
+ }
+ public void run(String dest) throws Exception {
+ InitialContext ic = getInitialContext();
+ QueueConnectionFactory confac = (QueueConnectionFactory) ic.lookup("ConnectionFactory");
+ QueueConnection connection = confac.createQueueConnection();
+ QueueSession session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(dest);
+ MessageConsumer consumer = session.createConsumer(topic);
+ consumer.setMessageListener(this);
+ connection.start();
+ }
+ public void onMessage(Message message){
+ try {
+ System.out.println(" Market data recived for symbol : "+ message.getJMSDestination().toString());
+ } catch (JMSException e) {
+ System.out.println("Error : "+e.getMessage());
+ }
+ }
+ private InitialContext getInitialContext() throws NamingException {
+ Properties env = new Properties();
+ if (System.getProperty("java.naming.provider.url") == null) {
+ env.put("java.naming.provider.url", "tcp://localhost:61616");
+ }
+ if (System.getProperty("java.naming.factory.initial") == null) {
+ env.put("java.naming.factory.initial",
+ "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+ }
+ return new InitialContext(env);
+ }
+}
Added: synapse/trunk/java/modules/samples/src/main/java/samples/userguide/MDDProducer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/src/main/java/samples/userguide/MDDProducer.java?rev=730227&view=auto
==============================================================================
--- synapse/trunk/java/modules/samples/src/main/java/samples/userguide/MDDProducer.java (added)
+++ synapse/trunk/java/modules/samples/src/main/java/samples/userguide/MDDProducer.java Tue Dec 30 12:07:46 2008
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package samples.userguide;
+
+import javax.jms.*;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.util.Properties;
+import java.io.*;
+
+public class MDDProducer {
+
+ private static String getProperty(String name, String def) {
+ String result = System.getProperty(name);
+ if (result == null || result.length() == 0) {
+ result = def;
+ }
+ return result;
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ String dest = getProperty("jms_dest", "dynamicQueues/JMSBinaryProxy");
+ String type = getProperty("jms_type", "binary");
+ String symbol = getProperty("symbol","MSFT");
+ String price = getProperty("price","100.20");
+ String market = getProperty("market","NYSE");
+
+ MDDProducer app = new MDDProducer();
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ dos.writeDouble(getRandom(1,1,true));
+ dos.writeUTF(symbol);
+ dos.writeDouble(Double.valueOf(price));
+ dos.writeUTF(market);
+ dos.flush();
+ app.sendBytesMessage(dest,bos.toByteArray());
+ dos.close();
+ bos.close();
+ }
+
+
+ private void sendBytesMessage(String destName,byte[] buffer) throws Exception {
+ InitialContext ic = getInitialContext();
+ QueueConnectionFactory confac = (QueueConnectionFactory) ic.lookup("ConnectionFactory");
+ QueueConnection connection = confac.createQueueConnection();
+ QueueSession session = connection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
+ BytesMessage bm = session.createBytesMessage();
+ bm.writeBytes(buffer);
+ QueueSender sender = session.createSender((Queue)ic.lookup(destName));
+ sender.send(bm);
+ sender.close();
+ session.close();
+ connection.close();
+ }
+
+
+ private InitialContext getInitialContext() throws NamingException {
+ Properties env = new Properties();
+ if (System.getProperty("java.naming.provider.url") == null) {
+ env.put("java.naming.provider.url", "tcp://localhost:61616");
+ }
+ if (System.getProperty("java.naming.factory.initial") == null) {
+ env.put("java.naming.factory.initial",
+ "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
+ }
+ return new InitialContext(env);
+ }
+
+ private static double getRandom(double base, double varience, boolean onlypositive) {
+ double rand = Math.random();
+ return (base + ((rand > 0.5 ? 1 : -1) * varience * base * rand))
+ * (onlypositive ? 1 : (rand > 0.5 ? 1 : -1));
+ }
+ private static byte[] intToByteArray(int value) {
+ return new byte[] {
+ (byte)(value >>> 24),
+ (byte)(value >>> 16),
+ (byte)(value >>> 8),
+ (byte)value};
+ }
+}
Modified: synapse/trunk/java/modules/samples/src/main/scripts/build.xml
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/samples/src/main/scripts/build.xml?rev=730227&r1=730226&r2=730227&view=diff
==============================================================================
--- synapse/trunk/java/modules/samples/src/main/scripts/build.xml (original)
+++ synapse/trunk/java/modules/samples/src/main/scripts/build.xml Tue Dec 30 12:07:46 2008
@@ -74,6 +74,14 @@
example:
ant amqpconsumer
+
+ ant mddproducer
+ A JMS client to produce market data on binary format seq#|symbol|price|market
+ [-Djms_dest=dynamicQueues/JMSTextProxy]
+ [-Dsymbol=MSFT] [-Dprice=100.20] [-Dmarket=NYSE]
+
+ example:
+ ant mddproducer
</echo>
</target>
@@ -101,6 +109,9 @@
<property name="propfile" value=""/>
<property name="sleep" value=""/>
<property name="session" value=""/>
+ <property name="price" value=""/>
+ <property name="market" value=""/>
+ <property name="jms_topic" value=""/>
<target name="clean">
<delete dir="target" quiet="true"/>
@@ -153,6 +164,23 @@
</java>
</target>
+ <target name="mddproducer" depends="compile">
+ <java classname="samples.userguide.MDDProducer"
+ classpathref="javac.classpath" fork="true">
+ <sysproperty key="jms_dest" value="${jms_dest}"/>
+ <sysproperty key="symbol" value="${symbol}"/>
+ <sysproperty key="price" value="${price}"/>
+ <sysproperty key="market" value="${market}"/>
+ </java>
+ </target>
+
+ <target name="mddconsumer" depends="compile">
+ <java classname="samples.userguide.MDDConsumer"
+ classpathref="javac.classpath" fork="true">
+ <sysproperty key="jms_topic" value="${jms_topic}"/>
+ </java>
+ </target>
+
<target name="jmsclient" depends="compile">
<java classname="samples.userguide.GenericJMSClient"
classpathref="javac.classpath" fork="true">
Added: synapse/trunk/java/repository/conf/sample/synapse_sample_381.xml
URL: http://svn.apache.org/viewvc/synapse/trunk/java/repository/conf/sample/synapse_sample_381.xml?rev=730227&view=auto
==============================================================================
--- synapse/trunk/java/repository/conf/sample/synapse_sample_381.xml (added)
+++ synapse/trunk/java/repository/conf/sample/synapse_sample_381.xml Tue Dec 30 12:07:46 2008
@@ -0,0 +1,57 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<!-- CBR for Binary Messages -->
+<definitions xmlns="http://ws.apache.org/ns/synapse">
+ <proxy name="JMSBinaryProxy" transports="jms">
+ <target inSequence="BINARY_CBR_SEQ" />
+ <property action="set" name="OUT_ONLY" value="true"/>
+ </proxy>
+ <sequence name="BINARY_CBR_SEQ">
+ <in>
+ <log level="full"/>
+ <class name="samples.mediators.BinaryExtractMediator">
+ <property name="offset" value="11"/>
+ <property name="length" value="4"/>
+ <property name="variableName" value="symbol"/>
+ <property name="binaryEncoding" value ="utf-8"/>
+ </class>
+ <log level="custom">
+ <property name="symbol" expression="get-property('symbol')"/>
+ </log>
+ <switch source="get-property('symbol')">
+ <case regex="GOOG">
+ <send>
+ <endpoint>
+ <address uri="jms:/mdd.GOOG?transport.jms.ConnectionFactoryJNDIName=TopicConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://localhost:61616&transport.jms.DestinationType=topic"/>
+ </endpoint>
+ </send>
+ </case>
+ <case regex="MSFT">
+ <send>
+ <endpoint>
+ <address uri="jms:/mdd.MSFT?transport.jms.ConnectionFactoryJNDIName=TopicConnectionFactory&java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&java.naming.provider.url=tcp://localhost:61616&transport.jms.DestinationType=topic"/>
+ </endpoint>
+ </send>
+ </case>
+ <default></default>
+ </switch>
+ </in>
+ </sequence>
+</definitions>
Modified: synapse/trunk/java/src/site/xdoc/Synapse_Samples.xml
URL: http://svn.apache.org/viewvc/synapse/trunk/java/src/site/xdoc/Synapse_Samples.xml?rev=730227&r1=730226&r2=730227&view=diff
==============================================================================
--- synapse/trunk/java/src/site/xdoc/Synapse_Samples.xml (original)
+++ synapse/trunk/java/src/site/xdoc/Synapse_Samples.xml Tue Dec 30 12:07:46 2008
@@ -264,7 +264,9 @@
<a href="#Class">Extending the mediation in java (Class Mediator)</a>
<ul>
<li>
-<a href="#Sample380">Sample 380: Writing your own custom mediation in Java</a></li></ul></li>
+<a href="#Sample380">Sample 380: Writing your own custom mediation in Java</a></li>
+<li>
+<a href="#Sample381">Sample 381: Class mediator to CBR binary messages</a></li></ul></li>
<li>
<a href="#XQuery">Evaluating XQuery for mediation (XQuery Mediator)</a>
<ul>
@@ -3663,6 +3665,73 @@
Original price: 162.30945327447262
Discounted price: 138.77458254967408</pre>
<p></p>
+<h2>
+<a name="Sample381" id="Sample381">Sample 381:Class mediator to CBR binary messages</a></h2>
+<pre xml:space="preserve"><definitions xmlns="http://ws.apache.org/ns/synapse">
+ <proxy name="JMSBinaryProxy" transports="jms">
+ <target inSequence="BINARY_CBR_SEQ" />
+ <property action="set" name="OUT_ONLY" value="true" />
+ </proxy>
+ <sequence name="BINARY_CBR_SEQ">
+ <in>
+ <log level="full" />
+ <class name="samples.mediators.BinaryExtractMediator">
+ <property name="offset" value="11" />
+ <property name="length" value="4" />
+ <property name="variableName" value="symbol" />
+ <property name="binaryEncoding" value="utf-8" />
+ </class>
+ <log level="custom">
+ <property name="symbol"
+ expression="get-property('symbol')" />
+ </log>
+ <switch source="get-property('symbol')">
+ <case regex="GOOG">
+ <send>
+ <endpoint>
+ <address
+ uri="jms:/mdd.GOOG?transport.jms.ConnectionFactoryJNDIName=TopicConnectionFactory&amp;java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&amp;java.naming.provider.url=tcp://localhost:61616&amp;transport.jms.DestinationType=topic" />
+ </endpoint>
+ </send>
+ </case>
+ <case regex="MSFT">
+ <send>
+ <endpoint>
+ <address
+ uri="jms:/mdd.MSFT?transport.jms.ConnectionFactoryJNDIName=TopicConnectionFactory&amp;java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory&amp;java.naming.provider.url=tcp://localhost:61616&amp;transport.jms.DestinationType=topic" />
+ </endpoint>
+ </send>
+ </case>
+ <default></default>
+ </switch>
+ </in>
+ </sequence>
+</definitions></pre>
+<p></p>
+<p>
+<strong>Objective: Demonstrate on CBR a message with binary payload</strong> </p>
+<p>
+<strong>Prerequisites:</strong> </p>
+<p>Make sure the synapse-samples-1.0.jar is in your class path (by default this jar is placed in the lib directory when installing Synapse). </p>
+<p>Configure JMS transport using ActiveMQ (refer <a href="Synapse_Samples_Setup.html">Sample Configuration Guide </a>) </p>
+<p>Start Synapse with the sample configuration 381 (i.e. synapse -sample 381) </p>
+
+<p></p>
+<p>In this configuration, a proxy has configured to accept incoming JMS messages. JMS messages contains a binary payload. User configure the offset, length, binary encoding of the text literal that it need to use for CBR. And a variable name to set the decoded value as a property. Configuration simply route the messages based on the text to different endpoints. </p>
+<p>A JMS producer and two instances of a consumer used to demonstrate the CBR functionality.</p>
+<p>Now run the first consumer using the following command. </p>
+<pre xml:space="preserve">ant mddconsumer -Djms_topic=mdd.MSFT</pre>
+<p>Now run the second consumer using the following command. </p>
+<pre xml:space="preserve">ant mddconsumer -Djms_topic=mdd.GOOG</pre>
+<p>Now run the market data producer to genenrate market data for symbol 'MSFT' using the following command. </p>
+<pre xml:space="preserve">ant mddproducer -Dsymbol=MSFT</pre>
+<p>Now run the market data producer to genenrate market data for symbol 'GOOG' using the following command. </p>
+<pre xml:space="preserve">ant mddproducer -Dsymbol=GOOG</pre>
+<p>You will see the below output in the client console(s) based on the symbol. </p>
+<pre xml:space="preserve">mddconsumer:
+ [java] Market data recived for symbol : topic://mdd.MSFT
+ [java] Market data recived for symbol : topic://mdd.MSFT</pre>
+<p></p>
<h2>
<a name="XQuery" id="XQuery">Evaluating XQuery for mediation (XQuery Mediator)</a></h2>
<h2>