You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ode.apache.org by va...@apache.org on 2013/04/10 15:48:12 UTC

git commit: fixing jackson-based serialization/deserialization.

Updated Branches:
  refs/heads/master 05e07f7bd -> 0f7d9afd5


fixing jackson-based serialization/deserialization.


Project: http://git-wip-us.apache.org/repos/asf/ode-jacob/repo
Commit: http://git-wip-us.apache.org/repos/asf/ode-jacob/commit/0f7d9afd
Tree: http://git-wip-us.apache.org/repos/asf/ode-jacob/tree/0f7d9afd
Diff: http://git-wip-us.apache.org/repos/asf/ode-jacob/diff/0f7d9afd

Branch: refs/heads/master
Commit: 0f7d9afd5ed440e275d537d33b27424bc6dc15a0
Parents: 05e07f7
Author: Tammo van Lessen <tv...@gmail.com>
Authored: Wed Apr 10 15:47:34 2013 +0200
Committer: Tammo van Lessen <tv...@gmail.com>
Committed: Wed Apr 10 15:47:34 2013 +0200

----------------------------------------------------------------------
 pom.xml                                            |    9 +++
 .../soup/jackson/ChannelProxyDeserializer.java     |    6 +-
 .../soup/jackson/JacksonExecutionQueueImpl.java    |   13 ++--
 .../JacobJacksonAnnotationIntrospector.java        |   19 +++++
 .../soup/jackson/JacobTypeResolverBuilder.java     |   36 +++++----
 .../apache/ode/jacob/vpu/ExecutionQueueImpl.java   |    3 +
 .../java/org/apache/ode/jacob/vpu/JacobVPU.java    |    4 +
 .../ode/jacob/examples/helloworld/HelloWorld.java  |   51 ++++++++-----
 .../ode/jacob/examples/sequence/Sequence.java      |   56 +++++++++------
 src/test/resources/log4j.properties                |    4 +-
 10 files changed, 135 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/0f7d9afd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b3b1f97..e81c28c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,6 +55,11 @@
 	      <artifactId>jackson-databind</artifactId>
 	      <version>${jackson.version}</version>
 	  </dependency>
+      <dependency>
+          <groupId>com.fasterxml.jackson.dataformat</groupId>
+          <artifactId>jackson-dataformat-smile</artifactId>
+          <version>${jackson.version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 
@@ -67,6 +72,10 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-smile</artifactId>
+    </dependency>
 
     <!-- test -->
     <dependency>

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/0f7d9afd/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxyDeserializer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxyDeserializer.java b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxyDeserializer.java
index 1b85bcc..069882b 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxyDeserializer.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/ChannelProxyDeserializer.java
@@ -67,9 +67,9 @@ public class ChannelProxyDeserializer extends StdDeserializer<Channel> {
 
         
         try {
-            CommChannel channel = new CommChannel(ctxt.findClass(type));
-            channel.setId(id);
-            return (Channel)ChannelFactory.createChannel(channel, channel.getType());
+            CommChannel cchannel = new CommChannel(ctxt.findClass(type));
+            cchannel.setId(id);
+            return (Channel)ChannelFactory.createChannel(cchannel, cchannel.getType());
 
         } catch (ClassNotFoundException e) {
             throw ctxt.instantiationException(Channel.class, e);

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/0f7d9afd/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
index ea1376e..e0892df 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacksonExecutionQueueImpl.java
@@ -55,7 +55,7 @@ public class JacksonExecutionQueueImpl extends ExecutionQueueImpl {
 		super(null);
 	}
 	
-    public static ObjectMapper configureMapper() {
+    public static void configureMapper(ObjectMapper om) {
         
         SimpleModule sm = new SimpleModule("jacobmodule");
         sm.addSerializer(ChannelProxy.class, new ChannelProxySerializer());
@@ -65,7 +65,6 @@ public class JacksonExecutionQueueImpl extends ExecutionQueueImpl {
         sm.addDeserializer(Continuation.class, new ContinuationDeserializer());
         sm.addDeserializer(Channel.class, new ChannelProxyDeserializer());
         
-        ObjectMapper om = new ObjectMapper();
         om.registerModule(sm);
         om.disable(MapperFeature.AUTO_DETECT_CREATORS);
         om.disable(MapperFeature.AUTO_DETECT_GETTERS);
@@ -73,11 +72,11 @@ public class JacksonExecutionQueueImpl extends ExecutionQueueImpl {
         om.setVisibility(PropertyAccessor.FIELD, Visibility.ANY);
         
         om.setDefaultTyping(new JacobTypeResolverBuilder());
+        om.setAnnotationIntrospector(new JacobJacksonAnnotationIntrospector());
         
+        om.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
         om.enable(SerializationFeature.WRITE_ENUMS_USING_INDEX);
-        om.enable(SerializationFeature.INDENT_OUTPUT);
-        
-        return om;
+        //om.enable(SerializationFeature.INDENT_OUTPUT);
     }
 
 	
@@ -156,6 +155,10 @@ public class JacksonExecutionQueueImpl extends ExecutionQueueImpl {
                 }
 
             }
+
+            // Garbage collection
+            // TODO
+
             return soup;
         }
 

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/0f7d9afd/src/main/java/org/apache/ode/jacob/soup/jackson/JacobJacksonAnnotationIntrospector.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobJacksonAnnotationIntrospector.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobJacksonAnnotationIntrospector.java
new file mode 100644
index 0000000..0cb3a73
--- /dev/null
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobJacksonAnnotationIntrospector.java
@@ -0,0 +1,19 @@
+package org.apache.ode.jacob.soup.jackson;
+
+import com.fasterxml.jackson.annotation.ObjectIdGenerators;
+import com.fasterxml.jackson.databind.introspect.Annotated;
+import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
+import com.fasterxml.jackson.databind.introspect.ObjectIdInfo;
+
+public class JacobJacksonAnnotationIntrospector extends
+        JacksonAnnotationIntrospector {
+
+    private static final long serialVersionUID = 1L;
+
+    /* enable object ids for all objects. */
+    @Override
+    public ObjectIdInfo findObjectIdInfo(Annotated ann) {
+        return new ObjectIdInfo("@id", Object.class, ObjectIdGenerators.IntSequenceGenerator.class);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/0f7d9afd/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java
index a01d13e..b477fcd 100644
--- a/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java
+++ b/src/main/java/org/apache/ode/jacob/soup/jackson/JacobTypeResolverBuilder.java
@@ -23,6 +23,8 @@ import java.util.Collection;
 import org.apache.ode.jacob.Channel;
 import org.apache.ode.jacob.ChannelProxy;
 import org.apache.ode.jacob.JacobObject;
+import org.apache.ode.jacob.soup.CommChannel;
+import org.apache.ode.jacob.vpu.ChannelFactory;
 
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
@@ -34,11 +36,11 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
 import com.fasterxml.jackson.databind.jsontype.TypeDeserializer;
 import com.fasterxml.jackson.databind.jsontype.TypeIdResolver;
 import com.fasterxml.jackson.databind.jsontype.TypeSerializer;
-import com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer;
 import com.fasterxml.jackson.databind.jsontype.impl.ClassNameIdResolver;
 import com.fasterxml.jackson.databind.jsontype.impl.StdTypeResolverBuilder;
 import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase;
 import com.fasterxml.jackson.databind.type.TypeFactory;
+import com.fasterxml.jackson.databind.util.ClassUtil;
 
 public class JacobTypeResolverBuilder extends StdTypeResolverBuilder {
 
@@ -66,18 +68,14 @@ public class JacobTypeResolverBuilder extends StdTypeResolverBuilder {
     
     private boolean useForType(JavaType t) {
         if (JacobObject.class.isAssignableFrom(t.getRawClass())) {
-            //System.err.println("XXX: JO " + t);
             return true;
         }
         
         if (Channel.class.isAssignableFrom(t.getRawClass()))  {
-            //System.err.println("XXX: CH " + t);
             return true;
         }
         
-        //if (!t.isConcrete()) {
         if (t.getRawClass() == Object.class) {
-        	//System.err.println("XXX: CON " + t + "- " + t.isConcrete());
             return true;
         }
 
@@ -89,13 +87,9 @@ public class JacobTypeResolverBuilder extends StdTypeResolverBuilder {
             JavaType baseType, Collection<NamedType> subtypes) {
         
         if (useForType(baseType)) {
-            if (baseType.isInterface() && Channel.class.isAssignableFrom(baseType.getRawClass())) {
-                TypeIdResolver idRes = idResolver(config, baseType, subtypes, false, true);
-                return new AsPropertyTypeDeserializer(baseType, idRes,
-                        _typeProperty, _typeIdVisible, Channel.class);
-            } else {
-                return super.buildTypeDeserializer(config, baseType, subtypes);    
-            }
+            // set Channel as the default impl.
+            defaultImpl(Channel.class);
+            return super.buildTypeDeserializer(config, baseType, subtypes);
         }
         
         return null;
@@ -113,7 +107,9 @@ public class JacobTypeResolverBuilder extends StdTypeResolverBuilder {
 
         public String idFromValue(Object value) {
             if (value instanceof ChannelProxy) {
-                return "<<channelproxy>>";
+                CommChannel commChannel = (CommChannel) ChannelFactory.getBackend((Channel)value);
+                return commChannel.getType().getName();
+
             }
             return delegate.idFromValue(value);
         }
@@ -123,10 +119,18 @@ public class JacobTypeResolverBuilder extends StdTypeResolverBuilder {
         }
 
         public JavaType typeFromId(String id) {
-            if ("<<channelproxy>>".equals(id)) {
-                return null; // force jackson to use default impl
+            try {
+                Class<?> cls =  ClassUtil.findClass(id);
+                if (Channel.class.isAssignableFrom(cls) && cls.isInterface()) {
+                    // return null to force Jackson to use default deserializer (which is the ChannelProxyDeserializer)
+                    return null;
+                }
+                return _typeFactory.constructSpecializedType(_baseType, cls);
+            } catch (ClassNotFoundException e) {
+                throw new IllegalArgumentException("Invalid type id '"+id+"' (for id type 'Id.class'): no such class found");
+            } catch (Exception e) {
+                throw new IllegalArgumentException("Invalid type id '"+id+"' (for id type 'Id.class'): "+e.getMessage(), e);
             }
-            return delegate.typeFromId(id);
         }
 
         public Id getMechanism() {

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/0f7d9afd/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
index c95cfb3..fff2ae1 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/ExecutionQueueImpl.java
@@ -537,6 +537,9 @@ public class ExecutionQueueImpl implements ExecutionQueue {
         boolean replicated;
         public Set<CommFrame> commFrames = new HashSet<CommFrame>();
 
+        // default constructor for deserialization
+        public CommGroupFrame() {}
+
         public CommGroupFrame(boolean replicated) {
             this.replicated = replicated;
         }

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/0f7d9afd/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
index 455431f..5d7660c 100644
--- a/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
+++ b/src/main/java/org/apache/ode/jacob/vpu/JacobVPU.java
@@ -140,6 +140,10 @@ public final class JacobVPU {
         _executionQueue = executionQueue;
         _executionQueue.setClassLoader(_classLoader);
     }
+    
+    public ExecutionQueue getContext() {
+        return _executionQueue;
+    }
 
     public void registerExtension(Class<?> extensionClass, Object obj) {
         LOG.trace(">> setContext (extensionClass={}, obj={})", extensionClass, obj);

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/0f7d9afd/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
index 746f374..5fb0b70 100644
--- a/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
+++ b/src/test/java/org/apache/ode/jacob/examples/helloworld/HelloWorld.java
@@ -78,17 +78,26 @@ public class HelloWorld extends JacobRunnable {
 
         public void run() {
             Synch callback = newChannel(Synch.class, "callback channel to ACK " + str);
-            object(new ReceiveProcess() {
-                private static final long serialVersionUID = 1L;
-            }.setChannel(callback).setReceiver(new Synch() {
-                public void ret() {
-                    System.out.println(str + " ACKed");
-                }
-            }));
+            object(new ReliableStringEmitterReceiveProcess().setChannel(callback).setReceiver(new ReliableStringEmitterSynch(str)));
             to.invoke(str, callback);
         }
+
+        static class ReliableStringEmitterReceiveProcess extends ReceiveProcess {}
+        static class ReliableStringEmitterSynch implements Synch {
+            private String str;
+
+            @JsonCreator
+            public ReliableStringEmitterSynch(@JsonProperty("str") String str) {
+                this.str = str;
+            }
+
+            @Override
+            public void ret() {
+                System.out.println(str + " ACKed");
+            }
+        }
     }
-    
+
     static class PrinterProcess extends JacobRunnable {
         private Val _in;
 
@@ -212,16 +221,19 @@ public class HelloWorld extends JacobRunnable {
 
 		@Override
 		protected JacobRunnable doStep(int step, Synch done) {
-			return new SequenceItemEmitter(greetings[step], done);
+			return new SequenceItemEmitter(greetings[step], done, out);
         }
 
-		class SequenceItemEmitter extends JacobRunnable {
+		static class SequenceItemEmitter extends JacobRunnable {
 			private final String string;
 			private final Synch done;
+			private final Val out;
 
-			public SequenceItemEmitter(String string, Synch done) {
+			@JsonCreator
+			public SequenceItemEmitter(@JsonProperty("string") String string, @JsonProperty("done") Synch done, @JsonProperty("out") Val out) {
 				this.string = string;
 				this.done = done;
+				this.out = out;
 			}
 
 			@Override
@@ -241,14 +253,18 @@ public class HelloWorld extends JacobRunnable {
     }
 
     public static void main(String args[]) throws Exception {
+        // enable logging
+        //BasicConfigurator.configure();
         long start = System.currentTimeMillis();
-        ObjectMapper mapper = JacksonExecutionQueueImpl.configureMapper();
+        ObjectMapper mapper = new ObjectMapper(); 
+        JacksonExecutionQueueImpl.configureMapper(mapper);
+
         JacobVPU vpu = new JacobVPU();
         JacksonExecutionQueueImpl queue = new JacksonExecutionQueueImpl();
         vpu.setContext(queue);
         vpu.inject(new HelloWorld());
         while (vpu.execute()) {
-            queue = loadAndRestoreQueue(mapper, queue);
+            queue = loadAndRestoreQueue(mapper, (JacksonExecutionQueueImpl)vpu.getContext());
             vpu.setContext(queue);
             System.out.println(vpu.isComplete() ? "<0>" : ".");
             //vpu.dumpState();
@@ -258,13 +274,10 @@ public class HelloWorld extends JacobRunnable {
     }
 
     public static JacksonExecutionQueueImpl loadAndRestoreQueue(ObjectMapper mapper, JacksonExecutionQueueImpl in) throws Exception {
-        String json = mapper.writeValueAsString(in);
-        // System.out.println(json);
+        byte[] json = mapper.writeValueAsBytes(in);
+        // print json
+        //System.out.println(new String(json));
         JacksonExecutionQueueImpl q2 = mapper.readValue(json, JacksonExecutionQueueImpl.class);
-        //String json2 = mapper.writeValueAsString(q2);
-
-        //    	System.out.println("----");
-        //    	System.out.println(json2);
         return q2;
     }
 

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/0f7d9afd/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java b/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java
index df4a785..9921267 100644
--- a/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java
+++ b/src/test/java/org/apache/ode/jacob/examples/sequence/Sequence.java
@@ -22,14 +22,15 @@ import org.apache.ode.jacob.JacobRunnable;
 import org.apache.ode.jacob.ReceiveProcess;
 import org.apache.ode.jacob.Synch;
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
 /**
  * Abstract process that executes a number of steps sequentially.
  */
 @SuppressWarnings("serial")
 public abstract class Sequence extends JacobRunnable {
-    private int _steps;
-    private int _current;
-    private Synch _done;
+    private final SequenceData data = new SequenceData();
 
     /**
      * Create a {@link Sequence} with a number of steps.
@@ -38,30 +39,20 @@ public abstract class Sequence extends JacobRunnable {
      * @param done synchronous callback
      */
     public Sequence(int steps, Synch done) {
-        _steps = steps;
-        _current = 0;
-        _done = done;
+        data._steps = steps;
+        data._current = 0;
+        data._done = done;
     }
 
-    /**
-     * Process execution block
-     */
     public void run() {
-        if (_current >= _steps) {
-            if (_done != null) {
-                _done.ret();
+        if (data._current >= data._steps) {
+            if (data._done != null) {
+                data._done.ret();
             }
         } else {
             Synch r = newChannel(Synch.class);
-            object(new ReceiveProcess() {
-                private static final long serialVersionUID = -6999108928780639603L;
-            }.setChannel(r).setReceiver(new Synch() {
-                public void ret() {
-                    ++_current;
-                    instance(Sequence.this);
-                }
-            }));
-            instance(doStep(_current, r));
+            object(new SequenceReceiveProcess().setChannel(r).setReceiver(new SequenceSynch(data, this)));
+            instance(doStep(data._current, r));
         }
     }
 
@@ -72,4 +63,27 @@ public abstract class Sequence extends JacobRunnable {
      * @return runnable process
      */
     protected abstract JacobRunnable doStep(int step, Synch done);
+
+    public static class SequenceData {
+        public int _steps;
+        public int _current;
+        public Synch _done;
+        //public Sequence _seq;
+    }
+
+    static class SequenceReceiveProcess extends ReceiveProcess {}
+    static class SequenceSynch implements Synch {
+        private final SequenceData data;
+        private final Sequence parent;
+
+        @JsonCreator
+        public SequenceSynch(@JsonProperty("data") SequenceData data, @JsonProperty("parent") Sequence parent) {
+            this.data = data;
+            this.parent = parent;
+        }
+        public void ret() {
+            ++data._current;
+            instance(parent);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ode-jacob/blob/0f7d9afd/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties
index 9c6599f..91bd688 100644
--- a/src/test/resources/log4j.properties
+++ b/src/test/resources/log4j.properties
@@ -16,10 +16,10 @@
 #
 
 # Set root logger level to WARN and its only appender to CONSOLE
-log4j.rootLogger=WARN, file
+log4j.rootLogger=TRACE, file
 
 # log4j properties to work with command line tools.
-log4j.category.org.apache.ode=INFO
+log4j.category.org.apache.ode=TRACE
 
 # Console appender
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender