You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2009/03/13 14:48:03 UTC

svn commit: r753249 - /servicemix/smx4/nmr/trunk/nmr/audit/src/main/java/org/apache/servicemix/nmr/audit/jdbc/JdbcAuditor.java

Author: gnodet
Date: Fri Mar 13 13:48:03 2009
New Revision: 753249

URL: http://svn.apache.org/viewvc?rev=753249&view=rev
Log:
SMX4NMR-22: ignore non serializable properties in exchange and messages

Modified:
    servicemix/smx4/nmr/trunk/nmr/audit/src/main/java/org/apache/servicemix/nmr/audit/jdbc/JdbcAuditor.java

Modified: servicemix/smx4/nmr/trunk/nmr/audit/src/main/java/org/apache/servicemix/nmr/audit/jdbc/JdbcAuditor.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/nmr/audit/src/main/java/org/apache/servicemix/nmr/audit/jdbc/JdbcAuditor.java?rev=753249&r1=753248&r2=753249&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/nmr/audit/src/main/java/org/apache/servicemix/nmr/audit/jdbc/JdbcAuditor.java (original)
+++ servicemix/smx4/nmr/trunk/nmr/audit/src/main/java/org/apache/servicemix/nmr/audit/jdbc/JdbcAuditor.java Fri Mar 13 13:48:03 2009
@@ -21,8 +21,14 @@
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.Serializable;
 import java.sql.Connection;
 import java.sql.SQLException;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
 
 import javax.sql.DataSource;
 
@@ -30,6 +36,8 @@
 import org.apache.servicemix.jdbc.JDBCAdapterFactory;
 import org.apache.servicemix.jdbc.Statements;
 import org.apache.servicemix.nmr.api.Exchange;
+import org.apache.servicemix.nmr.api.Type;
+import org.apache.servicemix.nmr.api.Message;
 import org.apache.servicemix.nmr.audit.AbstractAuditor;
 import org.apache.servicemix.nmr.audit.AuditorException;
 import org.springframework.beans.factory.InitializingBean;
@@ -57,6 +65,8 @@
     private String tableName = "SM_AUDIT";
     private JDBCAdapter adapter;
     private boolean createDataBase = true;
+    private Set<String> nonSerializableClasses = new HashSet<String>();
+    private ClassLoader tccl;
     
     public String getDescription() {
         return "JDBC Auditing Service";
@@ -93,6 +103,7 @@
         } finally {
             close(connection, restoreAutoCommit);
         }
+        this.tccl = Thread.currentThread().getContextClassLoader();
     }
     
     public void exchangeSent(Exchange exchange) {
@@ -106,11 +117,7 @@
                     connection.setAutoCommit(false);
                     restoreAutoCommit = true;
                 }
-                ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                ObjectOutputStream os = new ObjectOutputStream(baos);
-                os.writeObject(exchange);
-                os.close();
-                store(connection, id, baos.toByteArray());
+                store(connection, id, getDataForExchange(exchange));
                 connection.commit();
             } finally {
                 close(connection, restoreAutoCommit);
@@ -119,7 +126,73 @@
             log.error("Could not persist exchange", e);
         }
     }
-    
+
+    protected byte[] getDataForExchange(Exchange exchange) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream os = new ObjectOutputStream(baos);
+        os.writeObject(checkSerializable(exchange));
+        os.close();
+        return baos.toByteArray();
+    }
+
+    protected Exchange checkSerializable(Exchange exchange) {
+        boolean isSerializable = isMapSerializable(exchange.getProperties());
+        if (isSerializable) {
+            for (Type t : Type.values()) {
+                Message m = exchange.getMessage(t, false);
+                if (m != null) {
+                    if (!isMapSerializable(m.getHeaders())) {
+                        isSerializable = false;
+                        break;
+                    }
+                }
+            }
+        }
+        if (!isSerializable) {
+            exchange = exchange.copy();
+            makeMapSerializable(exchange.getProperties());
+            for (Type t : Type.values()) {
+                Message m = exchange.getMessage(t, false);
+                if (m != null) {
+                    makeMapSerializable(m.getHeaders());
+                }
+            }
+        }
+        return exchange;
+    }
+
+    protected boolean isMapSerializable(Map<String,Object> map) {
+        for (Object o : map.values()) {
+            if (o != null && !(o instanceof Serializable)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    protected void makeMapSerializable(Map<String,Object> map) {
+        List<String> badEntries = new ArrayList<String>();
+        for (Map.Entry<String,Object> entry : map.entrySet()) {
+            if (entry.getValue() != null && !(entry.getValue() instanceof Serializable)) {
+                warnAboutNonSerializableClass(entry.getValue());
+                badEntries.add(entry.getKey());
+            }
+        }
+        for (String key : badEntries) {
+            map.remove(key);
+        }
+    }
+
+    protected void warnAboutNonSerializableClass(Object o) {
+        boolean added;
+        synchronized (nonSerializableClasses) {
+            added = nonSerializableClasses.add(o.getClass().getName());
+        }
+        if (added) {
+            log.warn("Properties of types '" + o.getClass().getName() + "' will be removed from the audit log as they are not serializable");
+        }
+    }
+
     protected void store(Connection connection, String id, byte[] data) throws Exception {
         if (adapter.doLoadData(connection, id) != null) {
             adapter.doUpdateData(connection, id, data);
@@ -220,11 +293,15 @@
     }
     
     protected Exchange getExchange(byte[] data) throws AuditorException {
+        ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
         try {
+            Thread.currentThread().setContextClassLoader(tccl);
             ObjectInputStream is = new ObjectInputStream(new ByteArrayInputStream(data));
             return (Exchange) is.readObject();
         } catch (Exception e) {
             throw new AuditorException("Unable to reconstruct exchange", e);
+        } finally {
+            Thread.currentThread().setContextClassLoader(oldCl);
         }
     }