You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2008/08/27 10:46:47 UTC

svn commit: r689416 - in /servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr: AsynchronousAbstractAuditor.java JcrAuditor.java SlingJcrAuditorStrategy.java

Author: gertv
Date: Wed Aug 27 01:46:46 2008
New Revision: 689416

URL: http://svn.apache.org/viewvc?rev=689416&view=rev
Log:
Applying patch provided by Vladislav

Added:
    servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/AsynchronousAbstractAuditor.java
Modified:
    servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/JcrAuditor.java
    servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/SlingJcrAuditorStrategy.java

Added: servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/AsynchronousAbstractAuditor.java
URL: http://svn.apache.org/viewvc/servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/AsynchronousAbstractAuditor.java?rev=689416&view=auto
==============================================================================
--- servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/AsynchronousAbstractAuditor.java (added)
+++ servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/AsynchronousAbstractAuditor.java Wed Aug 27 01:46:46 2008
@@ -0,0 +1,39 @@
+package org.apache.servicemix.audit.jcr;
+
+import javax.jbi.JBIException;
+
+import org.apache.servicemix.jbi.audit.AbstractAuditor;
+import org.apache.servicemix.jbi.event.ExchangeEvent;
+
+/**
+ * 
+ * Abstract class for take care of all the serialization and multi-threading stuff
+ * 
+ * @author vkrejcirik
+ * 
+ */
+public abstract class AsynchronousAbstractAuditor extends AbstractAuditor {
+
+    
+    public void doStart() throws JBIException {
+        
+        
+        
+        super.doStart();
+    }
+    
+    
+    public void exchangeSent(ExchangeEvent event) {      
+        onExchangeSent(event);
+    }
+
+    public void exchangeAccepted(ExchangeEvent event) {
+        onExchangeAccepted(event);
+        super.exchangeAccepted(event);
+    }
+    
+    public abstract void onExchangeSent(ExchangeEvent event);
+
+    public abstract void onExchangeAccepted(ExchangeEvent event);
+
+}

Modified: servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/JcrAuditor.java
URL: http://svn.apache.org/viewvc/servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/JcrAuditor.java?rev=689416&r1=689415&r2=689416&view=diff
==============================================================================
--- servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/JcrAuditor.java (original)
+++ servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/JcrAuditor.java Wed Aug 27 01:46:46 2008
@@ -10,7 +10,6 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.jbi.audit.AbstractAuditor;
 import org.apache.servicemix.jbi.audit.AuditorException;
 import org.apache.servicemix.jbi.event.ExchangeEvent;
 
@@ -19,88 +18,99 @@
  * JCR auditor
  * 
  * @author vkrejcirik
- *
+ * 
  */
-public class JcrAuditor extends AbstractAuditor {
-	
-	private static final Log LOG = LogFactory.getLog(JcrAuditor.class);
-	
-	private Repository repository;
-	private ThreadLocal<Session> session = new ThreadLocal<Session>();
-	private JcrAuditorStrategy strategy;
-	
-	@Override
-	protected void doStart() throws JBIException {
-		if (repository == null) {
-			throw new JBIException("No repository configured, unable to start JCR auditor");
-		}
-		if (strategy == null) {
-			throw new JBIException("No JcrAuditorStrategy configure, unable to start JCR auditor");
-		}
-		super.doStart();
-	}
-	
-	/**
-	 * Open a session with the JCR Repository 
-	 */
-	protected Session getSession() throws LoginException, RepositoryException {
-		if (session .get() == null) {
-			Session session = repository.login(new SimpleCredentials("admin", "admin".toCharArray()));
-			this.session.set(session);
-		}
-		return session.get();
-	}
-	
-	public void exchangeSent(ExchangeEvent event) {
-		try {
-			strategy.processExchange(event.getExchange(), getSession());
-			getSession().save();
-			LOG.info("Successfully stored information about message exchange " + event.getExchange().getExchangeId() + " in the JCR repository");
-		} catch (Exception e) {
-			LOG.error("Unable to store information about message exchange " + event.getExchange().getExchangeId(), e);
-		}
-	}
-	
-	public String getDescription() {
-		return "ServiceMix JCR Auditor";
-	}
-	
-	// just some setters and getters
-	/**
-	 * Configure the JCR Repository to connect to
-	 */
-	public void setRepository(Repository repository) {
-		this.repository = repository;
-	}
-	
-	/**
-	 * Configure the {@link JcrAuditorStrategy} to use
-	 * @param strategy
-	 */
-	public void setStrategy(JcrAuditorStrategy strategy) {
-		this.strategy = strategy;
-	}
-
-	//to be implemented
-	@Override
-	public int deleteExchangesByIds(String[] arg0) throws AuditorException {
-		return 0;
-	}
-
-	@Override
-	public int getExchangeCount() throws AuditorException {
-		return 0;
-	}
-
-	@Override
-	public String[] getExchangeIdsByRange(int arg0, int arg1)
-			throws AuditorException {
-		return null;
-	}
-
-	@Override
-	public MessageExchange[] getExchangesByIds(String[] arg0)
-			throws AuditorException {
-		return null;
-	}
+public class JcrAuditor extends AsynchronousAbstractAuditor {
+
+    private static final Log LOG = LogFactory.getLog(JcrAuditor.class);
+
+    private Repository repository;
+    private ThreadLocal<Session> session = new ThreadLocal<Session>();
+    private JcrAuditorStrategy strategy;
+
+    @Override
+    public void doStart() throws JBIException {
+        if (repository == null) {
+            throw new JBIException(
+                    "No repository configured, unable to start JCR auditor");
+        }
+        if (strategy == null) {
+            throw new JBIException(
+                    "No JcrAuditorStrategy configure, unable to start JCR auditor");
+        }
+        super.doStart();
+    }
+
+    protected Session getSession() throws LoginException, RepositoryException {
+        if (session.get() == null) {
+            Session session = repository.login(new SimpleCredentials("admin",
+                    "admin".toCharArray()));
+            this.session.set(session);
+        }
+        return session.get();
+    }
+
+    public void onExchangeSent(ExchangeEvent event) {
+
+        try {
+            strategy.processExchange(event.getExchange(), getSession());
+            getSession().save();
+
+            LOG.info("Successfully stored information about message exchange "
+                    + event.getExchange().getExchangeId()
+                    + " in the JCR repository");
+        } catch (Exception e) {
+            LOG.error("Unable to store information about message exchange "
+                    + event.getExchange().getExchangeId(), e);
+        }
+    }
+
+    @Override
+    public void onExchangeAccepted(ExchangeEvent event) {
+
+    }
+
+    public String getDescription() {
+        return "ServiceMix JCR Auditor";
+    }
+
+    // just some setters and getters
+    /**
+     * Configure the JCR Repository to connect to
+     */
+    public void setRepository(Repository repository) {
+        this.repository = repository;
+    }
+
+    /**
+     * Configure the {@link JcrAuditorStrategy} to use
+     * 
+     * @param strategy
+     */
+    public void setStrategy(JcrAuditorStrategy strategy) {
+        this.strategy = strategy;
+    }
+
+    // to be implemented
+    @Override
+    public int deleteExchangesByIds(String[] arg0) throws AuditorException {
+        return 0;
+    }
+
+    @Override
+    public int getExchangeCount() throws AuditorException {
+        return 0;
+    }
+
+    @Override
+    public String[] getExchangeIdsByRange(int arg0, int arg1)
+            throws AuditorException {
+        return null;
+    }
+
+    @Override
+    public MessageExchange[] getExchangesByIds(String[] arg0)
+            throws AuditorException {
+        return null;
+    }
 }

Modified: servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/SlingJcrAuditorStrategy.java
URL: http://svn.apache.org/viewvc/servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/SlingJcrAuditorStrategy.java?rev=689416&r1=689415&r2=689416&view=diff
==============================================================================
--- servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/SlingJcrAuditorStrategy.java (original)
+++ servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/SlingJcrAuditorStrategy.java Wed Aug 27 01:46:46 2008
@@ -3,7 +3,8 @@
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
-import java.util.GregorianCalendar;
+import java.util.GregorianCalendar;
+import java.util.Locale;
 
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
@@ -30,195 +31,272 @@
  * @author vkrejcirik
  * 
  */
-public class SlingJcrAuditorStrategy implements JcrAuditorStrategy {
-
-	public static final String RESOURCE_TYPE = "sling:resourceType";
-	public static final String EXCHANGES_RESOURCE_TYPE = "servicemix/exchanges";
-	public static final String EXCHANGE_RESOURCE_TYPE = "servicemix/exchange";
-	public static final String MESSAGE_FLOW_RESOURCE_TYPE = "servicemix/message_flow";
-	public static final String NORMALIZED_MESSAGE_RESOURCE_TYPE = "servicemix/normalizedmessage";
-	public static final String CONTENT_EXCHANGES_TYPE = "content/servicemix/exchanges";
-	
-	private static final SourceTransformer TRANSFORMER = new SourceTransformer();
-
-	// let's time slice our message exchange archive on an hourly basis
-	private static final DateFormat FORMAT = new SimpleDateFormat("yyyy-MM-dd hha");
-
-	public void processExchange(MessageExchange messageExchange, Session session)
-			throws ItemExistsException, PathNotFoundException,
-			VersionException, ConstraintViolationException, LockException,
-			RepositoryException, MessagingException, TransformerException {
-
-		Node node = getNodeForExchange(messageExchange, session);
-
-		node.setProperty("ExchangeStatus", messageExchange.getStatus()
-				.toString());
-		node.setProperty("Pattern", messageExchange.getPattern().toString());
-
-		if (messageExchange.getEndpoint() != null) {
-			node.setProperty("Endpoint", messageExchange.getEndpoint()
-					.getEndpointName());
-		}
-
-		if (messageExchange.getService() != null) {
-			node
-					.setProperty("Service", messageExchange.getService()
-							.toString());
-		}
-
-		for (Object key : messageExchange.getPropertyNames()) {
-			String name = (String) key;
-
-			node
-					.setProperty(name, messageExchange.getProperty(name)
-							.toString());
-		}
-
-		addNormalizedMessages(node, messageExchange);
-		node.setProperty("Updated", new DateValue(new GregorianCalendar()));
-
-	}
-
-	private Node getNodeForExchange(MessageExchange exchange, Session session)
-			throws ItemExistsException, PathNotFoundException,
-			VersionException, ConstraintViolationException, LockException,
-			RepositoryException {
-
-		String id = exchange.getExchangeId().replaceAll(":", "_");
-		String corr_id = exchange.getProperty(
-				"org.apache.servicemix.correlationId").toString().replaceAll(":", "_");
-
-		// node with date
-		Node parent = getExchangeBaseNode(session);
-
-		// node with correlation id
-		Node parent_corr = getCorrelationIdNode(parent, corr_id, id);
-
-		try {
-			return parent_corr.getNode(id);
-
-		} catch (PathNotFoundException e) {
-			Node node = parent_corr.addNode(id);
-			node.setProperty(RESOURCE_TYPE, EXCHANGE_RESOURCE_TYPE);
-
-			node.setProperty("Created", new DateValue(new GregorianCalendar()));
-
-			node.addMixin("mix:versionable");
-			return node;
-		}
-	}
-
-	private synchronized Node getCorrelationIdNode(Node parent, String corr_id,
-			String id) throws RepositoryException, ValueFormatException,
-			VersionException, LockException, ConstraintViolationException {
-
-		//first exchange of the flow
-		if (corr_id == null) {
-			Node node = parent.addNode(id);
-			node.setProperty(RESOURCE_TYPE, EXCHANGES_RESOURCE_TYPE);
-			
-			node.setProperty("Created", new DateValue(new GregorianCalendar()));
-			node.setProperty("CorrelationId", corr_id);
-			return node;
-		}
-
-		try {
-			return parent.getNode(corr_id);
-		
-		} catch (PathNotFoundException e) {
-			
-			Node node = parent.addNode(id);
-			node.setProperty(RESOURCE_TYPE, EXCHANGES_RESOURCE_TYPE);
-			
-			node.setProperty("Created", new DateValue(new GregorianCalendar()));
-			node.setProperty("CorrelationId", corr_id);
-			
-			return node;
-		}
-	}
-
-	/**
-	 * 
-	 * Get base node for message exchange
-	 * 
-	 * @param session
-	 * @return Node
-	 * @throws RepositoryException
-	 */
-	private Node getExchangeBaseNode(Session session)
-			throws RepositoryException {
-		Node exchanges = session.getRootNode().getNode(CONTENT_EXCHANGES_TYPE);
-
-		return createOrGet(exchanges, FORMAT.format(new Date()));
-	}
-
-	/**
-	 * 
-	 * Create or get node with the path
-	 * 
-	 * @param exchanges
-	 * @param path
-	 * @return Node
-	 * @throws ValueFormatException
-	 * @throws VersionException
-	 * @throws LockException
-	 * @throws ConstraintViolationException
-	 * @throws RepositoryException
-	 */
-	private synchronized Node createOrGet(Node exchanges, String path)
-			throws ValueFormatException, VersionException, LockException,
-			ConstraintViolationException, RepositoryException {
-		try {
-			return exchanges.getNode(path);
-
-		} catch (PathNotFoundException e) {
-			Node node = exchanges.addNode(path);
-			node.setProperty(RESOURCE_TYPE, MESSAGE_FLOW_RESOURCE_TYPE);
-
-			node.setProperty("Created", new DateValue(new GregorianCalendar()));
-			return node;
-		}
-	}
-
-	private void addNormalizedMessages(Node node, MessageExchange exchange)
-			throws ItemExistsException, PathNotFoundException,
-			VersionException, ConstraintViolationException, LockException,
-			MessagingException, RepositoryException, TransformerException {
-		if (exchange.getMessage("in") != null) {
-			addNormalizedMessages(node, "In", exchange.getMessage("in"));
-		}
-		if (exchange.getMessage("out") != null) {
-			addNormalizedMessages(node, "Out", exchange.getMessage("out"));
-		}
-		if (exchange.getMessage("fault") != null) {
-			addNormalizedMessages(node, "Fault", exchange.getMessage("fault"));
-		}
-	}
-
-	private void addNormalizedMessages(Node parent, String type,
-			NormalizedMessage message) throws ItemExistsException,
-			PathNotFoundException, VersionException,
-			ConstraintViolationException, LockException, RepositoryException,
-			MessagingException, TransformerException {
-		if (message != null) {
-			Node node;
-			try {
-				node = parent.getNode(type);
-			} catch (PathNotFoundException e) {
-				node = parent.addNode(type);
-			}
-			node.setProperty("Content", getNormalizedMessageContent(message));
-			for (Object key : message.getPropertyNames()) {
-				String name = (String) key;
-				node.setProperty(name, message.getProperty(name).toString());
-			}
-			node.setProperty(RESOURCE_TYPE, NORMALIZED_MESSAGE_RESOURCE_TYPE);
-		}
-	}
-
-	private String getNormalizedMessageContent(NormalizedMessage message)
-			throws MessagingException, TransformerException {
-		MessageUtil.enableContentRereadability(message);
-		return TRANSFORMER.toString(message.getContent());
-	}
-}
+public class SlingJcrAuditorStrategy implements JcrAuditorStrategy {
+  
+     public static final String RESOURCE_TYPE = "sling:resourceType";
+  
+     // esp files for rendering
+     public static final String EXCHANGES_RESOURCE_TYPE = "servicemix/exchanges";
+     public static final String EXCHANGE_RESOURCE_TYPE = "servicemix/exchange";
+     public static final String MESSAGE_FLOW_RESOURCE_TYPE = "servicemix/message_flow";
+     public static final String NORMALIZED_MESSAGE_RESOURCE_TYPE = "servicemix/normalizedmessage";
+  
+     // content
+     public static final String CONTENT_MESSAGE_FLOWS_TYPE = "content/servicemix/message_flows";
+     public static final String CONTENT_EXCHANGES_TYPE = "content/servicemix/exchanges";
+  
+     private static final SourceTransformer TRANSFORMER = new SourceTransformer();
+  
+     // let's time slice our message exchange archive on an hourly basis
+     private static final DateFormat FORMAT = new SimpleDateFormat("yyyyMMddhha", Locale.ENGLISH);
+  
+     public void processExchange(MessageExchange messageExchange, Session session)
+             throws ItemExistsException, PathNotFoundException,
+             VersionException, ConstraintViolationException, LockException,
+             RepositoryException, MessagingException, TransformerException {
+  
+         Node node = getNodeForExchange(messageExchange, session);
+  
+         node.setProperty("ExchangeStatus", messageExchange.getStatus().toString());
+         node.setProperty("Pattern", messageExchange.getPattern().toString());
+  
+         if (messageExchange.getEndpoint() != null) {
+             node.setProperty("Endpoint", messageExchange.getEndpoint().getEndpointName());
+         }
+  
+         if (messageExchange.getService() != null) {
+             node.setProperty("Service", messageExchange.getService().toString());
+         }
+  
+         for (Object key : messageExchange.getPropertyNames()) {
+             String name = (String) key;
+             node.setProperty(name, messageExchange.getProperty(name).toString());
+         }
+ 
+         addNormalizedMessages(node, messageExchange);
+         node.setProperty("Updated", new DateValue(new GregorianCalendar()));
+ 
+     }
+  
+     /**
+      * 
+      * 
+      * 
+      * @param exchange
+      * @param session
+      * @return Node
+      * @throws ItemExistsException
+      * @throws PathNotFoundException
+      * @throws VersionException
+      * @throws ConstraintViolationException
+      * @throws LockException
+      * @throws RepositoryException
+      */
+     private Node getNodeForExchange(MessageExchange exchange, Session session)
+             throws ItemExistsException, PathNotFoundException,
+             VersionException, ConstraintViolationException, LockException,
+             RepositoryException {
+  
+         String id = exchange.getExchangeId().replaceAll(":", "_");
+         String corr_id = exchange.getProperty(
+                 "org.apache.servicemix.correlationId").toString().replaceAll(
+                 ":", "_");
+  
+         // node with date
+         // Node parent = getExchangeBaseNode(session);
+         Node parent = getMessageFlowBaseNode(session);
+  
+         // node with correlation id
+         Node parent_corr = getCorrelationIdNode(parent, corr_id, id);
+  
+         try {
+             return parent_corr.getNode(id);
+  
+         } catch (PathNotFoundException e) {
+             Node node = parent_corr.addNode(id);
+             node.setProperty(RESOURCE_TYPE, EXCHANGE_RESOURCE_TYPE);
+  
+             node.setProperty("Created", new DateValue(new GregorianCalendar()));
+  
+             node.addMixin("mix:versionable");
+             return node;
+         }
+     }
+  
+     /**
+      * 
+      * 
+      * 
+      * @param parent
+      * @param corr_id
+      * @param id
+      * @return Node
+      * @throws RepositoryException
+      * @throws ValueFormatException
+      * @throws VersionException
+      * @throws LockException
+      * @throws ConstraintViolationException
+      */
+     private synchronized Node getCorrelationIdNode(Node parent, String corr_id,
+             String id) throws RepositoryException, ValueFormatException,
+             VersionException, LockException, ConstraintViolationException {
+  
+         // first exchange of the flow
+         if (corr_id == null) {
+             Node node = parent.addNode(id);
+             node.setProperty(RESOURCE_TYPE, MESSAGE_FLOW_RESOURCE_TYPE);
+  
+             node.setProperty("Created", new DateValue(new GregorianCalendar()));
+             node.setProperty("CorrelationId", id);
+             return node;
+         }
+  
+         try {
+             return parent.getNode(corr_id);
+  
+         } catch (PathNotFoundException e) {
+  
+             Node node = parent.addNode(corr_id);
+             node.setProperty(RESOURCE_TYPE, MESSAGE_FLOW_RESOURCE_TYPE);
+  
+             node.setProperty("Created", new DateValue(new GregorianCalendar()));
+             node.setProperty("CorrelationId", corr_id);
+  
+             return node;
+         }
+     }
+ 
+     /**
+      * 
+      * Get base node for message flow
+      * 
+      * @param session
+      * @return Node
+      * @throws RepositoryException
+      */
+     private Node getMessageFlowBaseNode(Session session)
+             throws RepositoryException {
+         Node exchanges = session.getRootNode().getNode(
+                 CONTENT_MESSAGE_FLOWS_TYPE);
+
+         return createOrGet(exchanges, FORMAT.format(new Date()));
+     }
+
+     /*
+      * private Node getExchangeBaseNode(Session session) throws
+      * RepositoryException { Node exchanges =
+      * session.getRootNode().getNode(CONTENT_EXCHANGES_TYPE);
+      * 
+      * return createOrGet(exchanges, FORMAT.format(new Date())); }
+      */
+
+     /**
+      * 
+      * Create or get node with the path
+      * 
+      * @param exchanges
+      * @param path
+      * @return Node
+      * @throws ValueFormatException
+      * @throws VersionException
+      * @throws LockException
+      * @throws ConstraintViolationException
+      * @throws RepositoryException
+      */
+     private synchronized Node createOrGet(Node exchanges, String path)
+             throws ValueFormatException, VersionException, LockException,
+             ConstraintViolationException, RepositoryException {
+         try {
+             return exchanges.getNode(path);
+ 
+         } catch (PathNotFoundException e) {
+             Node node = exchanges.addNode(path);
+             node.setProperty(RESOURCE_TYPE, EXCHANGES_RESOURCE_TYPE);
+
+             node.setProperty("Created", new DateValue(new GregorianCalendar()));
+             return node;
+         }
+     }
+
+     /**
+      * 
+      * 
+      * 
+      * @param node
+      * @param exchange
+      * @throws ItemExistsException
+      * @throws PathNotFoundException
+      * @throws VersionException
+      * @throws ConstraintViolationException
+      * @throws LockException
+      * @throws MessagingException
+      * @throws RepositoryException
+      * @throws TransformerException
+      */
+     private void addNormalizedMessages(Node node, MessageExchange exchange)
+             throws ItemExistsException, PathNotFoundException,
+             VersionException, ConstraintViolationException, LockException,
+             MessagingException, RepositoryException, TransformerException {
+         if (exchange.getMessage("in") != null) {
+             addNormalizedMessages(node, "In", exchange.getMessage("in"));
+         }
+         if (exchange.getMessage("out") != null) {
+             addNormalizedMessages(node, "Out", exchange.getMessage("out"));
+         }
+         if (exchange.getMessage("fault") != null) {
+             addNormalizedMessages(node, "Fault", exchange.getMessage("fault"));
+         }
+     }
+ 
+     /**
+      * 
+      * 
+      * 
+      * @param parent
+      * @param type
+      * @param message
+      * @throws ItemExistsException
+      * @throws PathNotFoundException
+      * @throws VersionException
+      * @throws ConstraintViolationException
+      * @throws LockException
+      * @throws RepositoryException
+      * @throws MessagingException
+      * @throws TransformerException
+      */
+     private void addNormalizedMessages(Node parent, String type,
+             NormalizedMessage message) throws ItemExistsException,
+             PathNotFoundException, VersionException,
+             ConstraintViolationException, LockException, RepositoryException,
+             MessagingException, TransformerException {
+         if (message != null) {
+             Node node;
+             try {
+                 node = parent.getNode(type);
+             } catch (PathNotFoundException e) {
+                 node = parent.addNode(type);
+             }
+             node.setProperty("Content", getNormalizedMessageContent(message));
+             for (Object key : message.getPropertyNames()) {
+                 String name = (String) key;
+                 node.setProperty(name, message.getProperty(name).toString());
+             }
+             node.setProperty(RESOURCE_TYPE, NORMALIZED_MESSAGE_RESOURCE_TYPE);
+         }
+     }
+
+     /**
+      * 
+      * 
+      * 
+      * @param message
+      * @return String
+      * @throws MessagingException
+      * @throws TransformerException
+      */
+     private String getNormalizedMessageContent(NormalizedMessage message)
+             throws MessagingException, TransformerException {
+         MessageUtil.enableContentRereadability(message);
+         return TRANSFORMER.toString(message.getContent());
+     }
+}