You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Chris <cw...@gmail.com> on 2013/09/18 22:57:31 UTC

How to set a header in custom DataFormat?

I implemented a custom DataFormat and in the "unmarshal(Exchange 
exchange, InputStream stream)" implementation I set a header, but upon 
attempting to retrieve the header downstream from the "unmarshal" call,
the header is not there.  Since the pattern is inOnly, I added the 
header to the in Message.  I've done similar with custom Processors and 
in that case, it works.  What is different with DataFormat?  Is there 
any way to set header(s) from DataFormat marshal/unmarshal?

Thanks,

Chris

Re: AW: How to set a header in custom DataFormat?

Posted by Chris <cw...@gmail.com>.

On 9/20/2013 12:40 PM, Dale King wrote:
> On Fri, Sep 20, 2013 at 11:57 AM, Chris <cw...@gmail.com> wrote:
>
>> I have to say, I never saw this documented anywhere nor any example in the
>> book.  Of course I understand that there are a lot of higher priority items
>> to do, so I'm not complaining ;)
>>
>
> Well, it was in the code, isn't that documentation enough? ;-)

Better then nothing, true.  Except my last message where I said
it was impossible to set a new  header from a "marshal" implementation -
I just found a workaround for that too, and that wasn't commented...

Re: AW: How to set a header in custom DataFormat?

Posted by Dale King <da...@gmail.com>.
On Fri, Sep 20, 2013 at 11:57 AM, Chris <cw...@gmail.com> wrote:

> I have to say, I never saw this documented anywhere nor any example in the
> book.  Of course I understand that there are a lot of higher priority items
> to do, so I'm not complaining ;)
>

Well, it was in the code, isn't that documentation enough? ;-)

-- 
Dale King

Re: AW: How to set a header in custom DataFormat?

Posted by Chris <cw...@gmail.com>.
Actually you can just set the new headers on the *out* message for 
*both* marshalling/unmarshalling.  I.e. Regardless of what the code
comment in UnmarshalingProcessor says, you don't need to create a new 
Message and wrap the result.

On 9/20/2013 1:19 PM, Chris wrote:
> Ok, for *marshaling* the workaround is a bit different then
> *unmarshalling*.  For *marshalling*, the solution is to set new headers
> (or attachments, etc.) on the "out" Message. That wasn't documented or
> in the code comments... but, again - not complaining, just glad I got it
> to work.
>
> On 9/20/2013 12:38 PM, Chris wrote:
>> I spoke too soon... It works for unmarshalling, but not for marshalling
>> - it's impossible to set new headers, attachments, etc. from a
>> DataFormat's marshal method.
>>
>> On 9/20/2013 11:57 AM, Chris wrote:
>>> Ok, I solved the issue.  After reading the source code of
>>> org.apache.camel.processor.UnmarshalProcessor, I saw this little piece
>>> of code:
>>>
>>>              } else if (result instanceof Message) {
>>>                  // the dataformat has probably set headers,
>>> attachments, etc. so let's use it as the outbound payload
>>>                  exchange.setOut((Message) result);
>>>
>>> Which means if a DataFormat implementation wants to set headers and/or
>>> attachments, the unmarshal implementation cannot directly return the
>>> results - it must create a new Message and set the results to body of
>>> the new Message and return this new Message instead.
>>>
>>> I have to say, I never saw this documented anywhere nor any example in
>>> the book.  Of course I understand that there are a lot of higher
>>> priority items to do, so I'm not complaining ;)
>>>
>>> Well here's the revised sample DataFormat where you *can* successfully
>>> set new headers in the unmarshal implementation:
>>>
>>> Regards,
>>>
>>> Chris
>>>
>>>
>>> class CustomDataFormat2 implements DataFormat {
>>>
>>>      @Override
>>>      public Object unmarshal(Exchange exchange, InputStream stream)
>>>              throws Exception {
>>>
>>>          // To be able to set new headers and/or attachments, etc.
>>>          // we need to wrap the result in a new Message and return
>>>          // the new Message instead... who knew?
>>>          Message msg = exchange.getIn().copy();
>>>
>>>          List<String> result = new ArrayList<String>();
>>>
>>>          BufferedReader in
>>>              = new BufferedReader(new InputStreamReader(stream));
>>>
>>>          String line = null;
>>>          while((line = in.readLine()) != null) {
>>>              result.add(line);
>>>          }
>>>
>>>          msg.setBody(result);
>>>
>>>          // now it works!
>>>          msg.setHeader("DISAPPEARING_DEMO_HEADER", "Will it be set?");
>>>          return msg;
>>>      }
>>>
>>>      @Override
>>>      public void marshal(Exchange exchange, Object graph, OutputStream
>>> stream)
>>>              throws Exception {
>>>          throw new UnsupportedOperationException("Not implemented.");
>>>      }
>>> }
>>>
>>>
>>> On 9/20/2013 10:55 AM, Chris wrote:
>>>> Jan,
>>>>
>>>> I really appreciate the help, unfortunately your test case does not
>>>> even
>>>> reproduce the issue I am having.  I think I may not have been clear in
>>>> my original message.
>>>>
>>>> The issue is in a custom DataFormat, if a set a *new* header, that
>>>> header will be gone from the Exchange, for the rest of the route,
>>>> down-stream from the unmarshal (or marshal) call.
>>>>
>>>> So the issue is just setting a header and checking it it's still there.
>>>> So the best way to recreate the problem is to create the most simplest
>>>> DataFormat because marshal/unmarshal is not the issue - the issue is
>>>> setting a new header *inside* the custom marshal/unmarshal methods.
>>>> So I
>>>> created a test case whose custom DataFormat does the bare minimum - no
>>>> file I/O, no serialization - just string manipulation.
>>>>
>>>> Maybe you and/or other can have a look and explain why there's the
>>>> issue.
>>>>
>>>> Thanks,
>>>>
>>>> Chris
>>>>
>>>> (the code formatting will be wrecked by the mailing list line length
>>>> limit)
>>>>
>>>> import java.io.BufferedReader;
>>>> import java.io.InputStream;
>>>> import java.io.InputStreamReader;
>>>> import java.io.OutputStream;
>>>> import java.util.ArrayList;
>>>> import java.util.List;
>>>>
>>>>
>>>> import org.apache.camel.Exchange;
>>>> import org.apache.camel.builder.RouteBuilder;
>>>> import org.apache.camel.component.mock.MockEndpoint;
>>>> import org.apache.camel.impl.JndiRegistry;
>>>> import org.apache.camel.spi.DataFormat;
>>>> import org.apache.camel.test.junit4.CamelTestSupport;
>>>> import org.junit.Test;
>>>>
>>>>
>>>> public class SetHeaderDemo extends CamelTestSupport {
>>>>
>>>>      @Test
>>>>      public void setHeaderInDataFormatProblem() throws Exception {
>>>>          MockEndpoint mock = getMockEndpoint("mock:result");
>>>>          mock.expectedMessageCount(1);
>>>>          template.sendBodyAndHeader("direct:start",
>>>> "one\ntwo\nthree\n",
>>>> "DEMO_HEADER", "Hello...");
>>>>
>>>>          // This passes
>>>>          assertExpression(mock.getReceivedExchanges().get(0),
>>>>                  "simple", "${in.header.DEMO_HEADER}", "Hello...");
>>>>
>>>>          // This FAILS... WHY??? *************
>>>>          assertExpression(mock.getReceivedExchanges().get(0),
>>>>              "simple", "${in.header.DISAPPEARING_DEMO_HEADER}",
>>>> "Will it
>>>> be set?");
>>>>
>>>>          assertMockEndpointsSatisfied();
>>>>      }
>>>>
>>>>
>>>>      @Override
>>>>      protected RouteBuilder createRouteBuilder() throws Exception {
>>>>
>>>>          return new RouteBuilder() {
>>>>              @Override
>>>>              public void configure() {
>>>>                  from("direct:start")
>>>>                  .unmarshal("customFmt") // <== will try to add a new
>>>> header to Exchange IN msg
>>>>
>>>> .to("log://dataformat.demo?showAll=true&multiline=true&level=INFO") //
>>>> <== not there
>>>>                  .to("mock:result");
>>>>              }
>>>>          };
>>>>      }
>>>>
>>>>      @Override
>>>>      protected JndiRegistry createRegistry() throws Exception {
>>>>          JndiRegistry registry = super.createRegistry();
>>>>
>>>>          CustomDataFormat fmt = new CustomDataFormat();
>>>>          registry.bind("customFmt", fmt);
>>>>          return registry;
>>>>      }
>>>> }
>>>>
>>>> /**
>>>>   * Simplest, contrived DataFormat impl to demonstrate that it's
>>>> impossible
>>>>   * to set a new header in the DataFormat's implementation methods and
>>>>   * see the newly added header down-stream from the marshal/unmarshal
>>>>   * call(s).
>>>>   *
>>>>   */
>>>> class CustomDataFormat implements DataFormat {
>>>>
>>>>      /**
>>>>       * Expects the body to be a newline-delimited list of strings,
>>>>       * which will be unmarshalled to a string array, whose elements
>>>>       * are the "lines" in the "document".
>>>>       *
>>>>       * Obviously the marshal/unmarshal process is not important - the
>>>>       * issue is that if a new header is added in the DataFormat
>>>> marshal or
>>>>       * unmarshal - it will be GONE after returning.
>>>>       */
>>>>      @Override
>>>>      public Object unmarshal(Exchange exchange, InputStream stream)
>>>>              throws Exception {
>>>>          List<String> result = new ArrayList<String>();
>>>>
>>>>          BufferedReader in
>>>>              = new BufferedReader(new InputStreamReader(stream));
>>>>
>>>>          String line = null;
>>>>          while((line = in.readLine()) != null) {
>>>>              result.add(line);
>>>>          }
>>>>
>>>>          exchange.getIn().setHeader("DISAPPEARING_DEMO_HEADER",
>>>> "Will it
>>>> be set?");
>>>>          return result;
>>>>      }
>>>>
>>>>      @Override
>>>>      public void marshal(Exchange exchange, Object graph, OutputStream
>>>> stream)
>>>>              throws Exception {
>>>>          throw new UnsupportedOperationException("Not implemented.");
>>>>      }
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>> On 9/19/2013 4:13 AM, Jan Matèrne (jhm) wrote:
>>>>> I tried building my own DF and that works (for me)
>>>>>
>>>>> Jan
>>>>>
>>>>>
>>>>> package org.apache.camel.dataformat;
>>>>>
>>>>> import java.io.File;
>>>>> import java.io.FileInputStream;
>>>>> import java.io.FileNotFoundException;
>>>>> import java.io.FileOutputStream;
>>>>> import java.io.InputStream;
>>>>> import java.io.ObjectInputStream;
>>>>> import java.io.ObjectOutputStream;
>>>>> import java.io.OutputStream;
>>>>> import java.util.HashMap;
>>>>> import java.util.Map;
>>>>>
>>>>> import javax.activation.DataHandler;
>>>>>
>>>>> import org.apache.camel.Exchange;
>>>>> import org.apache.camel.Message;
>>>>> import org.apache.camel.Processor;
>>>>> import org.apache.camel.builder.ExchangeBuilder;
>>>>> import org.apache.camel.builder.RouteBuilder;
>>>>> import org.apache.camel.impl.DefaultMessage;
>>>>> import org.apache.camel.spi.DataFormat;
>>>>> import org.apache.camel.test.junit4.CamelTestSupport;
>>>>> import org.apache.camel.util.FileUtil;
>>>>> import org.apache.commons.io.IOUtils;
>>>>> import org.junit.After;
>>>>> import org.junit.Before;
>>>>> import org.junit.Test;
>>>>>
>>>>> public class SaveHeaderTest extends CamelTestSupport {
>>>>>
>>>>>     private static File dataFile = new File("output/message.ser");
>>>>>
>>>>>     DataFormat customDataFormat = new CustomDataFormat();
>>>>>
>>>>>     @After
>>>>>     @Before
>>>>>     public void cleanup() {
>>>>>         FileUtil.deleteFile(dataFile);
>>>>>     }
>>>>>
>>>>>     @SuppressWarnings("unchecked")
>>>>>     @Test
>>>>>     public void save() throws FileNotFoundException, Exception {
>>>>>         File writtenTo = new File("output/message.ser");
>>>>>         assertFalse(writtenTo.exists());
>>>>>         Exchange exchange = ExchangeBuilder.anExchange(context)
>>>>>                 .withBody("Hello World")
>>>>>                 .withHeader("from", "Apache Camel")
>>>>>                 .withHeader("test", "save")
>>>>>                 .withHeader(Exchange.FILE_NAME,
>>>>> "message.ser")
>>>>>                 .build();
>>>>>         template.send("direct:save", exchange);
>>>>>         assertTrue(writtenTo.exists());
>>>>>
>>>>>         // actual
>>>>>         ObjectInputStream in = new ObjectInputStream(new
>>>>> FileInputStream(dataFile));
>>>>>         Object body = in.readObject();
>>>>>         Map<String,Object> headers = (Map<String, Object>)
>>>>> in.readObject();
>>>>>         Map<String,DataHandler> attachments = (Map<String,
>>>>> DataHandler>) in.readObject();
>>>>>         String messageId = (String) in.readObject();
>>>>>         boolean fault = (boolean) in.readObject();
>>>>>
>>>>>         assertEquals("Hello World", body);
>>>>>         assertEquals("Apache Camel", headers.get("from"));
>>>>>         assertEquals("save", headers.get("test"));
>>>>>         assertTrue(attachments.isEmpty());
>>>>>         assertEquals(exchange.getIn().getMessageId(), messageId);
>>>>>         assertFalse(fault);
>>>>>     }
>>>>>
>>>>>     @Test
>>>>>     public void load() throws Exception {
>>>>>         // create test data
>>>>>         ObjectOutputStream out = new ObjectOutputStream(new
>>>>> FileOutputStream(dataFile));
>>>>>         out.writeObject("Hello World");
>>>>>
>>>>>         Map<String,Object> headers = new HashMap<String, Object>();
>>>>>         headers.put("from", "Apache Camel");
>>>>>         headers.put("test", "load");
>>>>>         out.writeObject(headers);
>>>>>
>>>>>         Map<String,DataHandler> attachments = new
>>>>> HashMap<String,DataHandler>();
>>>>>         out.writeObject(attachments);
>>>>>
>>>>>         out.writeObject("message-id");
>>>>>         out.writeObject(false);
>>>>>         out.close();
>>>>>
>>>>>         // load the saved 'exchange'
>>>>>         Exchange loadCommand = ExchangeBuilder.anExchange(context)
>>>>>                 .withBody(dataFile.getAbsolutePath())
>>>>>                 .build();
>>>>>         Exchange response = template.send("direct:load",
>>>>> loadCommand);
>>>>>         assertEquals("Hello World",
>>>>> response.getOut().getBody(String.class));
>>>>>         assertEquals("Apache Camel",
>>>>> response.getOut().getHeader("from"));
>>>>>         assertEquals("load", response.getOut().getHeader("test"));
>>>>>     }
>>>>>
>>>>>     @Test
>>>>>     public void turnaround() {
>>>>>         Exchange exchange = ExchangeBuilder.anExchange(context)
>>>>>                 .withBody("Hello World")
>>>>>                 .withHeader("from", "Apache Camel")
>>>>>                 .build();
>>>>>         Exchange response = template.send("direct:turnaround",
>>>>> exchange);
>>>>>         assertEquals("Hello World",
>>>>> response.getOut().getBody(String.class));
>>>>>         assertEquals("Apache Camel",
>>>>> response.getOut().getHeader("from"));
>>>>>     }
>>>>>
>>>>>
>>>>>     @Override
>>>>>     protected RouteBuilder createRouteBuilder() throws Exception {
>>>>>         return new RouteBuilder() {
>>>>>             @Override
>>>>>             public void configure() throws Exception {
>>>>>                 from("direct:save")
>>>>>                     .marshal(customDataFormat)
>>>>>                     .to("file:output");
>>>>>                 from("direct:load")
>>>>>                     .process(openFile())
>>>>>                     .unmarshal(customDataFormat);
>>>>>                 from("direct:turnaround")
>>>>>                     .marshal(customDataFormat)
>>>>>                     .unmarshal(customDataFormat);
>>>>>             }
>>>>>
>>>>>             private Processor openFile() {
>>>>>                 return new Processor() {
>>>>>                     @Override
>>>>>                     public void process(Exchange
>>>>> exchange) throws Exception {
>>>>>                         String filename =
>>>>> exchange.getIn().getBody(String.class);
>>>>>                         FileInputStream stream = new
>>>>> FileInputStream(filename);
>>>>>
>>>>> exchange.getIn().setBody(stream);
>>>>>                     }
>>>>>                 };
>>>>>             }
>>>>>         };
>>>>>     }
>>>>>
>>>>>     class CustomDataFormat implements DataFormat {
>>>>>
>>>>>         @Override
>>>>>         public void marshal(Exchange exchange, Object graph,
>>>>> OutputStream stream)
>>>>>                 throws Exception {
>>>>>             ObjectOutputStream out = null;
>>>>>
>>>>>             try {
>>>>>                 out = new ObjectOutputStream(stream);
>>>>>                 // Save the body
>>>>>                 out.writeObject(graph);
>>>>>                 // Save the message
>>>>>                 Message message = exchange.getIn();
>>>>>                 if (message != null) {
>>>>>                     // Message is not serializable, so
>>>>> save the data individually
>>>>>
>>>>> out.writeObject(message.getHeaders());
>>>>>
>>>>> out.writeObject(message.getAttachments());
>>>>>
>>>>> out.writeObject(message.getMessageId());
>>>>>                     out.writeObject(message.isFault());
>>>>>                 }
>>>>>             } finally {
>>>>>                 IOUtils.closeQuietly(out);
>>>>>             }
>>>>>         }
>>>>>
>>>>>         @SuppressWarnings("unchecked")
>>>>>         @Override
>>>>>         public Object unmarshal(Exchange exchange, InputStream
>>>>> stream)
>>>>>                 throws Exception {
>>>>>
>>>>>             ObjectInputStream in = null;
>>>>>             try {
>>>>>                 in = new ObjectInputStream(stream);
>>>>>
>>>>>                 // read the raw data
>>>>>                 Object body = in.readObject();
>>>>>                 Map<String,Object> headers = (Map<String,
>>>>> Object>) in.readObject();
>>>>>                 Map<String,DataHandler> attachments =
>>>>> (Map<String, DataHandler>) in.readObject();
>>>>>                 String messageId = (String) in.readObject();
>>>>>                 boolean fault = (boolean) in.readObject();
>>>>>
>>>>>                 // build the message
>>>>>                 Message msg = new DefaultMessage();
>>>>>                 msg.setBody(body);
>>>>>                 msg.setAttachments(attachments);
>>>>>                 msg.setHeaders(headers);
>>>>>                 msg.setMessageId(messageId);
>>>>>                 msg.setFault(fault);
>>>>>
>>>>>                 return msg;
>>>>>             } finally {
>>>>>                 IOUtils.closeQuietly(in);
>>>>>             }
>>>>>         }
>>>>>     }
>>>>>
>>>>> }
>>>>>
>>>>>> -----Ursprüngliche Nachricht-----
>>>>>> Von: Jan Matèrne (jhm) [mailto:apache@materne.de]
>>>>>> Gesendet: Donnerstag, 19. September 2013 07:06
>>>>>> An: users@camel.apache.org
>>>>>> Betreff: AW: How to set a header in custom DataFormat?
>>>>>>
>>>>>> What I have found is, that the exchange object which is passed to the
>>>>>> DataFormat is only used for getting the CamelContext.
>>>>>> I searched a little bit further and found a processor for
>>>>>> unmarshalling:
>>>>>>
>>>>>> org.apache.camel.processor.UnmarshalProcessor.process(Exchange,
>>>>>> AsyncCallback)
>>>>>>
>>>>>> And there is a note
>>>>>>
>>>>>>    Object result = dataFormat.unmarshal(exchange, stream);
>>>>>>      if (result instanceof Exchange) {
>>>>>>        if (result != exchange) {
>>>>>>          // it's not allowed to return another exchange other than
>>>>>> the
>>>>>> one provided to dataFormat
>>>>>>          throw new RuntimeCamelException("The returned exchange " +
>>>>>> result + " is not the same as " + exchange + " provided to the
>>>>>> DataFormat");
>>>>>>
>>>>>>
>>>>>> The logic of that processor is:
>>>>>> - get the object from the stream
>>>>>> - if it is an exchange, throw that exception
>>>>>> - if it is a message, store as 'out' on the exchange-parameter
>>>>>> - if it is something else, store it as body of the 'out' message
>>>>>>
>>>>>>
>>>>>> So have you tried setting the header on the 'out' message?
>>>>>>
>>>>>>
>>>>>> Jan
>>>>>>
>>>>>>
>>>>>>
>>>>>>> -----Ursprüngliche Nachricht-----
>>>>>>> Von: Chris [mailto:cwolf.algo@gmail.com]
>>>>>>> Gesendet: Mittwoch, 18. September 2013 22:58
>>>>>>> An: users@camel.apache.org
>>>>>>> Betreff: How to set a header in custom DataFormat?
>>>>>>>
>>>>>>> I implemented a custom DataFormat and in the "unmarshal(Exchange
>>>>>>> exchange, InputStream stream)" implementation I set a header, but
>>>>>> upon
>>>>>>> attempting to retrieve the header downstream from the "unmarshal"
>>>>>>> call, the header is not there.  Since the pattern is inOnly, I added
>>>>>>> the header to the in Message.  I've done similar with custom
>>>>>>> Processors and in that case, it works.  What is different with
>>>>>>> DataFormat?  Is there any way to set header(s) from DataFormat
>>>>>> marshal/unmarshal?
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Chris
>>>>>
>>>>>

Re: AW: How to set a header in custom DataFormat?

Posted by Chris <cw...@gmail.com>.
Ok, for *marshaling* the workaround is a bit different then 
*unmarshalling*.  For *marshalling*, the solution is to set new headers 
(or attachments, etc.) on the "out" Message. That wasn't documented or 
in the code comments... but, again - not complaining, just glad I got it 
to work.

On 9/20/2013 12:38 PM, Chris wrote:
> I spoke too soon... It works for unmarshalling, but not for marshalling
> - it's impossible to set new headers, attachments, etc. from a
> DataFormat's marshal method.
>
> On 9/20/2013 11:57 AM, Chris wrote:
>> Ok, I solved the issue.  After reading the source code of
>> org.apache.camel.processor.UnmarshalProcessor, I saw this little piece
>> of code:
>>
>>              } else if (result instanceof Message) {
>>                  // the dataformat has probably set headers,
>> attachments, etc. so let's use it as the outbound payload
>>                  exchange.setOut((Message) result);
>>
>> Which means if a DataFormat implementation wants to set headers and/or
>> attachments, the unmarshal implementation cannot directly return the
>> results - it must create a new Message and set the results to body of
>> the new Message and return this new Message instead.
>>
>> I have to say, I never saw this documented anywhere nor any example in
>> the book.  Of course I understand that there are a lot of higher
>> priority items to do, so I'm not complaining ;)
>>
>> Well here's the revised sample DataFormat where you *can* successfully
>> set new headers in the unmarshal implementation:
>>
>> Regards,
>>
>> Chris
>>
>>
>> class CustomDataFormat2 implements DataFormat {
>>
>>      @Override
>>      public Object unmarshal(Exchange exchange, InputStream stream)
>>              throws Exception {
>>
>>          // To be able to set new headers and/or attachments, etc.
>>          // we need to wrap the result in a new Message and return
>>          // the new Message instead... who knew?
>>          Message msg = exchange.getIn().copy();
>>
>>          List<String> result = new ArrayList<String>();
>>
>>          BufferedReader in
>>              = new BufferedReader(new InputStreamReader(stream));
>>
>>          String line = null;
>>          while((line = in.readLine()) != null) {
>>              result.add(line);
>>          }
>>
>>          msg.setBody(result);
>>
>>          // now it works!
>>          msg.setHeader("DISAPPEARING_DEMO_HEADER", "Will it be set?");
>>          return msg;
>>      }
>>
>>      @Override
>>      public void marshal(Exchange exchange, Object graph, OutputStream
>> stream)
>>              throws Exception {
>>          throw new UnsupportedOperationException("Not implemented.");
>>      }
>> }
>>
>>
>> On 9/20/2013 10:55 AM, Chris wrote:
>>> Jan,
>>>
>>> I really appreciate the help, unfortunately your test case does not even
>>> reproduce the issue I am having.  I think I may not have been clear in
>>> my original message.
>>>
>>> The issue is in a custom DataFormat, if a set a *new* header, that
>>> header will be gone from the Exchange, for the rest of the route,
>>> down-stream from the unmarshal (or marshal) call.
>>>
>>> So the issue is just setting a header and checking it it's still there.
>>> So the best way to recreate the problem is to create the most simplest
>>> DataFormat because marshal/unmarshal is not the issue - the issue is
>>> setting a new header *inside* the custom marshal/unmarshal methods. So I
>>> created a test case whose custom DataFormat does the bare minimum - no
>>> file I/O, no serialization - just string manipulation.
>>>
>>> Maybe you and/or other can have a look and explain why there's the
>>> issue.
>>>
>>> Thanks,
>>>
>>> Chris
>>>
>>> (the code formatting will be wrecked by the mailing list line length
>>> limit)
>>>
>>> import java.io.BufferedReader;
>>> import java.io.InputStream;
>>> import java.io.InputStreamReader;
>>> import java.io.OutputStream;
>>> import java.util.ArrayList;
>>> import java.util.List;
>>>
>>>
>>> import org.apache.camel.Exchange;
>>> import org.apache.camel.builder.RouteBuilder;
>>> import org.apache.camel.component.mock.MockEndpoint;
>>> import org.apache.camel.impl.JndiRegistry;
>>> import org.apache.camel.spi.DataFormat;
>>> import org.apache.camel.test.junit4.CamelTestSupport;
>>> import org.junit.Test;
>>>
>>>
>>> public class SetHeaderDemo extends CamelTestSupport {
>>>
>>>      @Test
>>>      public void setHeaderInDataFormatProblem() throws Exception {
>>>          MockEndpoint mock = getMockEndpoint("mock:result");
>>>          mock.expectedMessageCount(1);
>>>          template.sendBodyAndHeader("direct:start", "one\ntwo\nthree\n",
>>> "DEMO_HEADER", "Hello...");
>>>
>>>          // This passes
>>>          assertExpression(mock.getReceivedExchanges().get(0),
>>>                  "simple", "${in.header.DEMO_HEADER}", "Hello...");
>>>
>>>          // This FAILS... WHY??? *************
>>>          assertExpression(mock.getReceivedExchanges().get(0),
>>>              "simple", "${in.header.DISAPPEARING_DEMO_HEADER}", "Will it
>>> be set?");
>>>
>>>          assertMockEndpointsSatisfied();
>>>      }
>>>
>>>
>>>      @Override
>>>      protected RouteBuilder createRouteBuilder() throws Exception {
>>>
>>>          return new RouteBuilder() {
>>>              @Override
>>>              public void configure() {
>>>                  from("direct:start")
>>>                  .unmarshal("customFmt") // <== will try to add a new
>>> header to Exchange IN msg
>>>
>>> .to("log://dataformat.demo?showAll=true&multiline=true&level=INFO") //
>>> <== not there
>>>                  .to("mock:result");
>>>              }
>>>          };
>>>      }
>>>
>>>      @Override
>>>      protected JndiRegistry createRegistry() throws Exception {
>>>          JndiRegistry registry = super.createRegistry();
>>>
>>>          CustomDataFormat fmt = new CustomDataFormat();
>>>          registry.bind("customFmt", fmt);
>>>          return registry;
>>>      }
>>> }
>>>
>>> /**
>>>   * Simplest, contrived DataFormat impl to demonstrate that it's
>>> impossible
>>>   * to set a new header in the DataFormat's implementation methods and
>>>   * see the newly added header down-stream from the marshal/unmarshal
>>>   * call(s).
>>>   *
>>>   */
>>> class CustomDataFormat implements DataFormat {
>>>
>>>      /**
>>>       * Expects the body to be a newline-delimited list of strings,
>>>       * which will be unmarshalled to a string array, whose elements
>>>       * are the "lines" in the "document".
>>>       *
>>>       * Obviously the marshal/unmarshal process is not important - the
>>>       * issue is that if a new header is added in the DataFormat
>>> marshal or
>>>       * unmarshal - it will be GONE after returning.
>>>       */
>>>      @Override
>>>      public Object unmarshal(Exchange exchange, InputStream stream)
>>>              throws Exception {
>>>          List<String> result = new ArrayList<String>();
>>>
>>>          BufferedReader in
>>>              = new BufferedReader(new InputStreamReader(stream));
>>>
>>>          String line = null;
>>>          while((line = in.readLine()) != null) {
>>>              result.add(line);
>>>          }
>>>
>>>          exchange.getIn().setHeader("DISAPPEARING_DEMO_HEADER", "Will it
>>> be set?");
>>>          return result;
>>>      }
>>>
>>>      @Override
>>>      public void marshal(Exchange exchange, Object graph, OutputStream
>>> stream)
>>>              throws Exception {
>>>          throw new UnsupportedOperationException("Not implemented.");
>>>      }
>>> }
>>>
>>>
>>>
>>>
>>> On 9/19/2013 4:13 AM, Jan Matèrne (jhm) wrote:
>>>> I tried building my own DF and that works (for me)
>>>>
>>>> Jan
>>>>
>>>>
>>>> package org.apache.camel.dataformat;
>>>>
>>>> import java.io.File;
>>>> import java.io.FileInputStream;
>>>> import java.io.FileNotFoundException;
>>>> import java.io.FileOutputStream;
>>>> import java.io.InputStream;
>>>> import java.io.ObjectInputStream;
>>>> import java.io.ObjectOutputStream;
>>>> import java.io.OutputStream;
>>>> import java.util.HashMap;
>>>> import java.util.Map;
>>>>
>>>> import javax.activation.DataHandler;
>>>>
>>>> import org.apache.camel.Exchange;
>>>> import org.apache.camel.Message;
>>>> import org.apache.camel.Processor;
>>>> import org.apache.camel.builder.ExchangeBuilder;
>>>> import org.apache.camel.builder.RouteBuilder;
>>>> import org.apache.camel.impl.DefaultMessage;
>>>> import org.apache.camel.spi.DataFormat;
>>>> import org.apache.camel.test.junit4.CamelTestSupport;
>>>> import org.apache.camel.util.FileUtil;
>>>> import org.apache.commons.io.IOUtils;
>>>> import org.junit.After;
>>>> import org.junit.Before;
>>>> import org.junit.Test;
>>>>
>>>> public class SaveHeaderTest extends CamelTestSupport {
>>>>
>>>>     private static File dataFile = new File("output/message.ser");
>>>>
>>>>     DataFormat customDataFormat = new CustomDataFormat();
>>>>
>>>>     @After
>>>>     @Before
>>>>     public void cleanup() {
>>>>         FileUtil.deleteFile(dataFile);
>>>>     }
>>>>
>>>>     @SuppressWarnings("unchecked")
>>>>     @Test
>>>>     public void save() throws FileNotFoundException, Exception {
>>>>         File writtenTo = new File("output/message.ser");
>>>>         assertFalse(writtenTo.exists());
>>>>         Exchange exchange = ExchangeBuilder.anExchange(context)
>>>>                 .withBody("Hello World")
>>>>                 .withHeader("from", "Apache Camel")
>>>>                 .withHeader("test", "save")
>>>>                 .withHeader(Exchange.FILE_NAME,
>>>> "message.ser")
>>>>                 .build();
>>>>         template.send("direct:save", exchange);
>>>>         assertTrue(writtenTo.exists());
>>>>
>>>>         // actual
>>>>         ObjectInputStream in = new ObjectInputStream(new
>>>> FileInputStream(dataFile));
>>>>         Object body = in.readObject();
>>>>         Map<String,Object> headers = (Map<String, Object>)
>>>> in.readObject();
>>>>         Map<String,DataHandler> attachments = (Map<String,
>>>> DataHandler>) in.readObject();
>>>>         String messageId = (String) in.readObject();
>>>>         boolean fault = (boolean) in.readObject();
>>>>
>>>>         assertEquals("Hello World", body);
>>>>         assertEquals("Apache Camel", headers.get("from"));
>>>>         assertEquals("save", headers.get("test"));
>>>>         assertTrue(attachments.isEmpty());
>>>>         assertEquals(exchange.getIn().getMessageId(), messageId);
>>>>         assertFalse(fault);
>>>>     }
>>>>
>>>>     @Test
>>>>     public void load() throws Exception {
>>>>         // create test data
>>>>         ObjectOutputStream out = new ObjectOutputStream(new
>>>> FileOutputStream(dataFile));
>>>>         out.writeObject("Hello World");
>>>>
>>>>         Map<String,Object> headers = new HashMap<String, Object>();
>>>>         headers.put("from", "Apache Camel");
>>>>         headers.put("test", "load");
>>>>         out.writeObject(headers);
>>>>
>>>>         Map<String,DataHandler> attachments = new
>>>> HashMap<String,DataHandler>();
>>>>         out.writeObject(attachments);
>>>>
>>>>         out.writeObject("message-id");
>>>>         out.writeObject(false);
>>>>         out.close();
>>>>
>>>>         // load the saved 'exchange'
>>>>         Exchange loadCommand = ExchangeBuilder.anExchange(context)
>>>>                 .withBody(dataFile.getAbsolutePath())
>>>>                 .build();
>>>>         Exchange response = template.send("direct:load",
>>>> loadCommand);
>>>>         assertEquals("Hello World",
>>>> response.getOut().getBody(String.class));
>>>>         assertEquals("Apache Camel",
>>>> response.getOut().getHeader("from"));
>>>>         assertEquals("load", response.getOut().getHeader("test"));
>>>>     }
>>>>
>>>>     @Test
>>>>     public void turnaround() {
>>>>         Exchange exchange = ExchangeBuilder.anExchange(context)
>>>>                 .withBody("Hello World")
>>>>                 .withHeader("from", "Apache Camel")
>>>>                 .build();
>>>>         Exchange response = template.send("direct:turnaround",
>>>> exchange);
>>>>         assertEquals("Hello World",
>>>> response.getOut().getBody(String.class));
>>>>         assertEquals("Apache Camel",
>>>> response.getOut().getHeader("from"));
>>>>     }
>>>>
>>>>
>>>>     @Override
>>>>     protected RouteBuilder createRouteBuilder() throws Exception {
>>>>         return new RouteBuilder() {
>>>>             @Override
>>>>             public void configure() throws Exception {
>>>>                 from("direct:save")
>>>>                     .marshal(customDataFormat)
>>>>                     .to("file:output");
>>>>                 from("direct:load")
>>>>                     .process(openFile())
>>>>                     .unmarshal(customDataFormat);
>>>>                 from("direct:turnaround")
>>>>                     .marshal(customDataFormat)
>>>>                     .unmarshal(customDataFormat);
>>>>             }
>>>>
>>>>             private Processor openFile() {
>>>>                 return new Processor() {
>>>>                     @Override
>>>>                     public void process(Exchange
>>>> exchange) throws Exception {
>>>>                         String filename =
>>>> exchange.getIn().getBody(String.class);
>>>>                         FileInputStream stream = new
>>>> FileInputStream(filename);
>>>>
>>>> exchange.getIn().setBody(stream);
>>>>                     }
>>>>                 };
>>>>             }
>>>>         };
>>>>     }
>>>>
>>>>     class CustomDataFormat implements DataFormat {
>>>>
>>>>         @Override
>>>>         public void marshal(Exchange exchange, Object graph,
>>>> OutputStream stream)
>>>>                 throws Exception {
>>>>             ObjectOutputStream out = null;
>>>>
>>>>             try {
>>>>                 out = new ObjectOutputStream(stream);
>>>>                 // Save the body
>>>>                 out.writeObject(graph);
>>>>                 // Save the message
>>>>                 Message message = exchange.getIn();
>>>>                 if (message != null) {
>>>>                     // Message is not serializable, so
>>>> save the data individually
>>>>
>>>> out.writeObject(message.getHeaders());
>>>>
>>>> out.writeObject(message.getAttachments());
>>>>
>>>> out.writeObject(message.getMessageId());
>>>>                     out.writeObject(message.isFault());
>>>>                 }
>>>>             } finally {
>>>>                 IOUtils.closeQuietly(out);
>>>>             }
>>>>         }
>>>>
>>>>         @SuppressWarnings("unchecked")
>>>>         @Override
>>>>         public Object unmarshal(Exchange exchange, InputStream
>>>> stream)
>>>>                 throws Exception {
>>>>
>>>>             ObjectInputStream in = null;
>>>>             try {
>>>>                 in = new ObjectInputStream(stream);
>>>>
>>>>                 // read the raw data
>>>>                 Object body = in.readObject();
>>>>                 Map<String,Object> headers = (Map<String,
>>>> Object>) in.readObject();
>>>>                 Map<String,DataHandler> attachments =
>>>> (Map<String, DataHandler>) in.readObject();
>>>>                 String messageId = (String) in.readObject();
>>>>                 boolean fault = (boolean) in.readObject();
>>>>
>>>>                 // build the message
>>>>                 Message msg = new DefaultMessage();
>>>>                 msg.setBody(body);
>>>>                 msg.setAttachments(attachments);
>>>>                 msg.setHeaders(headers);
>>>>                 msg.setMessageId(messageId);
>>>>                 msg.setFault(fault);
>>>>
>>>>                 return msg;
>>>>             } finally {
>>>>                 IOUtils.closeQuietly(in);
>>>>             }
>>>>         }
>>>>     }
>>>>
>>>> }
>>>>
>>>>> -----Ursprüngliche Nachricht-----
>>>>> Von: Jan Matèrne (jhm) [mailto:apache@materne.de]
>>>>> Gesendet: Donnerstag, 19. September 2013 07:06
>>>>> An: users@camel.apache.org
>>>>> Betreff: AW: How to set a header in custom DataFormat?
>>>>>
>>>>> What I have found is, that the exchange object which is passed to the
>>>>> DataFormat is only used for getting the CamelContext.
>>>>> I searched a little bit further and found a processor for
>>>>> unmarshalling:
>>>>>
>>>>> org.apache.camel.processor.UnmarshalProcessor.process(Exchange,
>>>>> AsyncCallback)
>>>>>
>>>>> And there is a note
>>>>>
>>>>>    Object result = dataFormat.unmarshal(exchange, stream);
>>>>>      if (result instanceof Exchange) {
>>>>>        if (result != exchange) {
>>>>>          // it's not allowed to return another exchange other than the
>>>>> one provided to dataFormat
>>>>>          throw new RuntimeCamelException("The returned exchange " +
>>>>> result + " is not the same as " + exchange + " provided to the
>>>>> DataFormat");
>>>>>
>>>>>
>>>>> The logic of that processor is:
>>>>> - get the object from the stream
>>>>> - if it is an exchange, throw that exception
>>>>> - if it is a message, store as 'out' on the exchange-parameter
>>>>> - if it is something else, store it as body of the 'out' message
>>>>>
>>>>>
>>>>> So have you tried setting the header on the 'out' message?
>>>>>
>>>>>
>>>>> Jan
>>>>>
>>>>>
>>>>>
>>>>>> -----Ursprüngliche Nachricht-----
>>>>>> Von: Chris [mailto:cwolf.algo@gmail.com]
>>>>>> Gesendet: Mittwoch, 18. September 2013 22:58
>>>>>> An: users@camel.apache.org
>>>>>> Betreff: How to set a header in custom DataFormat?
>>>>>>
>>>>>> I implemented a custom DataFormat and in the "unmarshal(Exchange
>>>>>> exchange, InputStream stream)" implementation I set a header, but
>>>>> upon
>>>>>> attempting to retrieve the header downstream from the "unmarshal"
>>>>>> call, the header is not there.  Since the pattern is inOnly, I added
>>>>>> the header to the in Message.  I've done similar with custom
>>>>>> Processors and in that case, it works.  What is different with
>>>>>> DataFormat?  Is there any way to set header(s) from DataFormat
>>>>> marshal/unmarshal?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Chris
>>>>
>>>>

Re: AW: How to set a header in custom DataFormat?

Posted by Chris <cw...@gmail.com>.
I spoke too soon... It works for unmarshalling, but not for marshalling 
- it's impossible to set new headers, attachments, etc. from a 
DataFormat's marshal method.

On 9/20/2013 11:57 AM, Chris wrote:
> Ok, I solved the issue.  After reading the source code of
> org.apache.camel.processor.UnmarshalProcessor, I saw this little piece
> of code:
>
>              } else if (result instanceof Message) {
>                  // the dataformat has probably set headers,
> attachments, etc. so let's use it as the outbound payload
>                  exchange.setOut((Message) result);
>
> Which means if a DataFormat implementation wants to set headers and/or
> attachments, the unmarshal implementation cannot directly return the
> results - it must create a new Message and set the results to body of
> the new Message and return this new Message instead.
>
> I have to say, I never saw this documented anywhere nor any example in
> the book.  Of course I understand that there are a lot of higher
> priority items to do, so I'm not complaining ;)
>
> Well here's the revised sample DataFormat where you *can* successfully
> set new headers in the unmarshal implementation:
>
> Regards,
>
> Chris
>
>
> class CustomDataFormat2 implements DataFormat {
>
>      @Override
>      public Object unmarshal(Exchange exchange, InputStream stream)
>              throws Exception {
>
>          // To be able to set new headers and/or attachments, etc.
>          // we need to wrap the result in a new Message and return
>          // the new Message instead... who knew?
>          Message msg = exchange.getIn().copy();
>
>          List<String> result = new ArrayList<String>();
>
>          BufferedReader in
>              = new BufferedReader(new InputStreamReader(stream));
>
>          String line = null;
>          while((line = in.readLine()) != null) {
>              result.add(line);
>          }
>
>          msg.setBody(result);
>
>          // now it works!
>          msg.setHeader("DISAPPEARING_DEMO_HEADER", "Will it be set?");
>          return msg;
>      }
>
>      @Override
>      public void marshal(Exchange exchange, Object graph, OutputStream
> stream)
>              throws Exception {
>          throw new UnsupportedOperationException("Not implemented.");
>      }
> }
>
>
> On 9/20/2013 10:55 AM, Chris wrote:
>> Jan,
>>
>> I really appreciate the help, unfortunately your test case does not even
>> reproduce the issue I am having.  I think I may not have been clear in
>> my original message.
>>
>> The issue is in a custom DataFormat, if a set a *new* header, that
>> header will be gone from the Exchange, for the rest of the route,
>> down-stream from the unmarshal (or marshal) call.
>>
>> So the issue is just setting a header and checking it it's still there.
>> So the best way to recreate the problem is to create the most simplest
>> DataFormat because marshal/unmarshal is not the issue - the issue is
>> setting a new header *inside* the custom marshal/unmarshal methods. So I
>> created a test case whose custom DataFormat does the bare minimum - no
>> file I/O, no serialization - just string manipulation.
>>
>> Maybe you and/or other can have a look and explain why there's the issue.
>>
>> Thanks,
>>
>> Chris
>>
>> (the code formatting will be wrecked by the mailing list line length
>> limit)
>>
>> import java.io.BufferedReader;
>> import java.io.InputStream;
>> import java.io.InputStreamReader;
>> import java.io.OutputStream;
>> import java.util.ArrayList;
>> import java.util.List;
>>
>>
>> import org.apache.camel.Exchange;
>> import org.apache.camel.builder.RouteBuilder;
>> import org.apache.camel.component.mock.MockEndpoint;
>> import org.apache.camel.impl.JndiRegistry;
>> import org.apache.camel.spi.DataFormat;
>> import org.apache.camel.test.junit4.CamelTestSupport;
>> import org.junit.Test;
>>
>>
>> public class SetHeaderDemo extends CamelTestSupport {
>>
>>      @Test
>>      public void setHeaderInDataFormatProblem() throws Exception {
>>          MockEndpoint mock = getMockEndpoint("mock:result");
>>          mock.expectedMessageCount(1);
>>          template.sendBodyAndHeader("direct:start", "one\ntwo\nthree\n",
>> "DEMO_HEADER", "Hello...");
>>
>>          // This passes
>>          assertExpression(mock.getReceivedExchanges().get(0),
>>                  "simple", "${in.header.DEMO_HEADER}", "Hello...");
>>
>>          // This FAILS... WHY??? *************
>>          assertExpression(mock.getReceivedExchanges().get(0),
>>              "simple", "${in.header.DISAPPEARING_DEMO_HEADER}", "Will it
>> be set?");
>>
>>          assertMockEndpointsSatisfied();
>>      }
>>
>>
>>      @Override
>>      protected RouteBuilder createRouteBuilder() throws Exception {
>>
>>          return new RouteBuilder() {
>>              @Override
>>              public void configure() {
>>                  from("direct:start")
>>                  .unmarshal("customFmt") // <== will try to add a new
>> header to Exchange IN msg
>>
>> .to("log://dataformat.demo?showAll=true&multiline=true&level=INFO") //
>> <== not there
>>                  .to("mock:result");
>>              }
>>          };
>>      }
>>
>>      @Override
>>      protected JndiRegistry createRegistry() throws Exception {
>>          JndiRegistry registry = super.createRegistry();
>>
>>          CustomDataFormat fmt = new CustomDataFormat();
>>          registry.bind("customFmt", fmt);
>>          return registry;
>>      }
>> }
>>
>> /**
>>   * Simplest, contrived DataFormat impl to demonstrate that it's
>> impossible
>>   * to set a new header in the DataFormat's implementation methods and
>>   * see the newly added header down-stream from the marshal/unmarshal
>>   * call(s).
>>   *
>>   */
>> class CustomDataFormat implements DataFormat {
>>
>>      /**
>>       * Expects the body to be a newline-delimited list of strings,
>>       * which will be unmarshalled to a string array, whose elements
>>       * are the "lines" in the "document".
>>       *
>>       * Obviously the marshal/unmarshal process is not important - the
>>       * issue is that if a new header is added in the DataFormat
>> marshal or
>>       * unmarshal - it will be GONE after returning.
>>       */
>>      @Override
>>      public Object unmarshal(Exchange exchange, InputStream stream)
>>              throws Exception {
>>          List<String> result = new ArrayList<String>();
>>
>>          BufferedReader in
>>              = new BufferedReader(new InputStreamReader(stream));
>>
>>          String line = null;
>>          while((line = in.readLine()) != null) {
>>              result.add(line);
>>          }
>>
>>          exchange.getIn().setHeader("DISAPPEARING_DEMO_HEADER", "Will it
>> be set?");
>>          return result;
>>      }
>>
>>      @Override
>>      public void marshal(Exchange exchange, Object graph, OutputStream
>> stream)
>>              throws Exception {
>>          throw new UnsupportedOperationException("Not implemented.");
>>      }
>> }
>>
>>
>>
>>
>> On 9/19/2013 4:13 AM, Jan Matèrne (jhm) wrote:
>>> I tried building my own DF and that works (for me)
>>>
>>> Jan
>>>
>>>
>>> package org.apache.camel.dataformat;
>>>
>>> import java.io.File;
>>> import java.io.FileInputStream;
>>> import java.io.FileNotFoundException;
>>> import java.io.FileOutputStream;
>>> import java.io.InputStream;
>>> import java.io.ObjectInputStream;
>>> import java.io.ObjectOutputStream;
>>> import java.io.OutputStream;
>>> import java.util.HashMap;
>>> import java.util.Map;
>>>
>>> import javax.activation.DataHandler;
>>>
>>> import org.apache.camel.Exchange;
>>> import org.apache.camel.Message;
>>> import org.apache.camel.Processor;
>>> import org.apache.camel.builder.ExchangeBuilder;
>>> import org.apache.camel.builder.RouteBuilder;
>>> import org.apache.camel.impl.DefaultMessage;
>>> import org.apache.camel.spi.DataFormat;
>>> import org.apache.camel.test.junit4.CamelTestSupport;
>>> import org.apache.camel.util.FileUtil;
>>> import org.apache.commons.io.IOUtils;
>>> import org.junit.After;
>>> import org.junit.Before;
>>> import org.junit.Test;
>>>
>>> public class SaveHeaderTest extends CamelTestSupport {
>>>
>>>     private static File dataFile = new File("output/message.ser");
>>>
>>>     DataFormat customDataFormat = new CustomDataFormat();
>>>
>>>     @After
>>>     @Before
>>>     public void cleanup() {
>>>         FileUtil.deleteFile(dataFile);
>>>     }
>>>
>>>     @SuppressWarnings("unchecked")
>>>     @Test
>>>     public void save() throws FileNotFoundException, Exception {
>>>         File writtenTo = new File("output/message.ser");
>>>         assertFalse(writtenTo.exists());
>>>         Exchange exchange = ExchangeBuilder.anExchange(context)
>>>                 .withBody("Hello World")
>>>                 .withHeader("from", "Apache Camel")
>>>                 .withHeader("test", "save")
>>>                 .withHeader(Exchange.FILE_NAME,
>>> "message.ser")
>>>                 .build();
>>>         template.send("direct:save", exchange);
>>>         assertTrue(writtenTo.exists());
>>>
>>>         // actual
>>>         ObjectInputStream in = new ObjectInputStream(new
>>> FileInputStream(dataFile));
>>>         Object body = in.readObject();
>>>         Map<String,Object> headers = (Map<String, Object>)
>>> in.readObject();
>>>         Map<String,DataHandler> attachments = (Map<String,
>>> DataHandler>) in.readObject();
>>>         String messageId = (String) in.readObject();
>>>         boolean fault = (boolean) in.readObject();
>>>
>>>         assertEquals("Hello World", body);
>>>         assertEquals("Apache Camel", headers.get("from"));
>>>         assertEquals("save", headers.get("test"));
>>>         assertTrue(attachments.isEmpty());
>>>         assertEquals(exchange.getIn().getMessageId(), messageId);
>>>         assertFalse(fault);
>>>     }
>>>
>>>     @Test
>>>     public void load() throws Exception {
>>>         // create test data
>>>         ObjectOutputStream out = new ObjectOutputStream(new
>>> FileOutputStream(dataFile));
>>>         out.writeObject("Hello World");
>>>
>>>         Map<String,Object> headers = new HashMap<String, Object>();
>>>         headers.put("from", "Apache Camel");
>>>         headers.put("test", "load");
>>>         out.writeObject(headers);
>>>
>>>         Map<String,DataHandler> attachments = new
>>> HashMap<String,DataHandler>();
>>>         out.writeObject(attachments);
>>>
>>>         out.writeObject("message-id");
>>>         out.writeObject(false);
>>>         out.close();
>>>
>>>         // load the saved 'exchange'
>>>         Exchange loadCommand = ExchangeBuilder.anExchange(context)
>>>                 .withBody(dataFile.getAbsolutePath())
>>>                 .build();
>>>         Exchange response = template.send("direct:load",
>>> loadCommand);
>>>         assertEquals("Hello World",
>>> response.getOut().getBody(String.class));
>>>         assertEquals("Apache Camel",
>>> response.getOut().getHeader("from"));
>>>         assertEquals("load", response.getOut().getHeader("test"));
>>>     }
>>>
>>>     @Test
>>>     public void turnaround() {
>>>         Exchange exchange = ExchangeBuilder.anExchange(context)
>>>                 .withBody("Hello World")
>>>                 .withHeader("from", "Apache Camel")
>>>                 .build();
>>>         Exchange response = template.send("direct:turnaround",
>>> exchange);
>>>         assertEquals("Hello World",
>>> response.getOut().getBody(String.class));
>>>         assertEquals("Apache Camel",
>>> response.getOut().getHeader("from"));
>>>     }
>>>
>>>
>>>     @Override
>>>     protected RouteBuilder createRouteBuilder() throws Exception {
>>>         return new RouteBuilder() {
>>>             @Override
>>>             public void configure() throws Exception {
>>>                 from("direct:save")
>>>                     .marshal(customDataFormat)
>>>                     .to("file:output");
>>>                 from("direct:load")
>>>                     .process(openFile())
>>>                     .unmarshal(customDataFormat);
>>>                 from("direct:turnaround")
>>>                     .marshal(customDataFormat)
>>>                     .unmarshal(customDataFormat);
>>>             }
>>>
>>>             private Processor openFile() {
>>>                 return new Processor() {
>>>                     @Override
>>>                     public void process(Exchange
>>> exchange) throws Exception {
>>>                         String filename =
>>> exchange.getIn().getBody(String.class);
>>>                         FileInputStream stream = new
>>> FileInputStream(filename);
>>>
>>> exchange.getIn().setBody(stream);
>>>                     }
>>>                 };
>>>             }
>>>         };
>>>     }
>>>
>>>     class CustomDataFormat implements DataFormat {
>>>
>>>         @Override
>>>         public void marshal(Exchange exchange, Object graph,
>>> OutputStream stream)
>>>                 throws Exception {
>>>             ObjectOutputStream out = null;
>>>
>>>             try {
>>>                 out = new ObjectOutputStream(stream);
>>>                 // Save the body
>>>                 out.writeObject(graph);
>>>                 // Save the message
>>>                 Message message = exchange.getIn();
>>>                 if (message != null) {
>>>                     // Message is not serializable, so
>>> save the data individually
>>>
>>> out.writeObject(message.getHeaders());
>>>
>>> out.writeObject(message.getAttachments());
>>>
>>> out.writeObject(message.getMessageId());
>>>                     out.writeObject(message.isFault());
>>>                 }
>>>             } finally {
>>>                 IOUtils.closeQuietly(out);
>>>             }
>>>         }
>>>
>>>         @SuppressWarnings("unchecked")
>>>         @Override
>>>         public Object unmarshal(Exchange exchange, InputStream
>>> stream)
>>>                 throws Exception {
>>>
>>>             ObjectInputStream in = null;
>>>             try {
>>>                 in = new ObjectInputStream(stream);
>>>
>>>                 // read the raw data
>>>                 Object body = in.readObject();
>>>                 Map<String,Object> headers = (Map<String,
>>> Object>) in.readObject();
>>>                 Map<String,DataHandler> attachments =
>>> (Map<String, DataHandler>) in.readObject();
>>>                 String messageId = (String) in.readObject();
>>>                 boolean fault = (boolean) in.readObject();
>>>
>>>                 // build the message
>>>                 Message msg = new DefaultMessage();
>>>                 msg.setBody(body);
>>>                 msg.setAttachments(attachments);
>>>                 msg.setHeaders(headers);
>>>                 msg.setMessageId(messageId);
>>>                 msg.setFault(fault);
>>>
>>>                 return msg;
>>>             } finally {
>>>                 IOUtils.closeQuietly(in);
>>>             }
>>>         }
>>>     }
>>>
>>> }
>>>
>>>> -----Ursprüngliche Nachricht-----
>>>> Von: Jan Matèrne (jhm) [mailto:apache@materne.de]
>>>> Gesendet: Donnerstag, 19. September 2013 07:06
>>>> An: users@camel.apache.org
>>>> Betreff: AW: How to set a header in custom DataFormat?
>>>>
>>>> What I have found is, that the exchange object which is passed to the
>>>> DataFormat is only used for getting the CamelContext.
>>>> I searched a little bit further and found a processor for
>>>> unmarshalling:
>>>>
>>>> org.apache.camel.processor.UnmarshalProcessor.process(Exchange,
>>>> AsyncCallback)
>>>>
>>>> And there is a note
>>>>
>>>>    Object result = dataFormat.unmarshal(exchange, stream);
>>>>      if (result instanceof Exchange) {
>>>>        if (result != exchange) {
>>>>          // it's not allowed to return another exchange other than the
>>>> one provided to dataFormat
>>>>          throw new RuntimeCamelException("The returned exchange " +
>>>> result + " is not the same as " + exchange + " provided to the
>>>> DataFormat");
>>>>
>>>>
>>>> The logic of that processor is:
>>>> - get the object from the stream
>>>> - if it is an exchange, throw that exception
>>>> - if it is a message, store as 'out' on the exchange-parameter
>>>> - if it is something else, store it as body of the 'out' message
>>>>
>>>>
>>>> So have you tried setting the header on the 'out' message?
>>>>
>>>>
>>>> Jan
>>>>
>>>>
>>>>
>>>>> -----Ursprüngliche Nachricht-----
>>>>> Von: Chris [mailto:cwolf.algo@gmail.com]
>>>>> Gesendet: Mittwoch, 18. September 2013 22:58
>>>>> An: users@camel.apache.org
>>>>> Betreff: How to set a header in custom DataFormat?
>>>>>
>>>>> I implemented a custom DataFormat and in the "unmarshal(Exchange
>>>>> exchange, InputStream stream)" implementation I set a header, but
>>>> upon
>>>>> attempting to retrieve the header downstream from the "unmarshal"
>>>>> call, the header is not there.  Since the pattern is inOnly, I added
>>>>> the header to the in Message.  I've done similar with custom
>>>>> Processors and in that case, it works.  What is different with
>>>>> DataFormat?  Is there any way to set header(s) from DataFormat
>>>> marshal/unmarshal?
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Chris
>>>
>>>

Re: AW: How to set a header in custom DataFormat?

Posted by Chris <cw...@gmail.com>.
Ok, I solved the issue.  After reading the source code of
org.apache.camel.processor.UnmarshalProcessor, I saw this little piece 
of code:

             } else if (result instanceof Message) {
                 // the dataformat has probably set headers, 
attachments, etc. so let's use it as the outbound payload
                 exchange.setOut((Message) result);

Which means if a DataFormat implementation wants to set headers and/or 
attachments, the unmarshal implementation cannot directly return the 
results - it must create a new Message and set the results to body of 
the new Message and return this new Message instead.

I have to say, I never saw this documented anywhere nor any example in 
the book.  Of course I understand that there are a lot of higher 
priority items to do, so I'm not complaining ;)

Well here's the revised sample DataFormat where you *can* successfully 
set new headers in the unmarshal implementation:

Regards,

Chris


class CustomDataFormat2 implements DataFormat {

     @Override
     public Object unmarshal(Exchange exchange, InputStream stream)
             throws Exception {

         // To be able to set new headers and/or attachments, etc.
         // we need to wrap the result in a new Message and return
         // the new Message instead... who knew?
         Message msg = exchange.getIn().copy();

         List<String> result = new ArrayList<String>();

         BufferedReader in
             = new BufferedReader(new InputStreamReader(stream));

         String line = null;
         while((line = in.readLine()) != null) {
             result.add(line);
         }

         msg.setBody(result);

         // now it works!
         msg.setHeader("DISAPPEARING_DEMO_HEADER", "Will it be set?");
         return msg;
     }

     @Override
     public void marshal(Exchange exchange, Object graph, OutputStream 
stream)
             throws Exception {
         throw new UnsupportedOperationException("Not implemented.");
     }
}


On 9/20/2013 10:55 AM, Chris wrote:
> Jan,
>
> I really appreciate the help, unfortunately your test case does not even
> reproduce the issue I am having.  I think I may not have been clear in
> my original message.
>
> The issue is in a custom DataFormat, if a set a *new* header, that
> header will be gone from the Exchange, for the rest of the route,
> down-stream from the unmarshal (or marshal) call.
>
> So the issue is just setting a header and checking it it's still there.
> So the best way to recreate the problem is to create the most simplest
> DataFormat because marshal/unmarshal is not the issue - the issue is
> setting a new header *inside* the custom marshal/unmarshal methods. So I
> created a test case whose custom DataFormat does the bare minimum - no
> file I/O, no serialization - just string manipulation.
>
> Maybe you and/or other can have a look and explain why there's the issue.
>
> Thanks,
>
> Chris
>
> (the code formatting will be wrecked by the mailing list line length limit)
>
> import java.io.BufferedReader;
> import java.io.InputStream;
> import java.io.InputStreamReader;
> import java.io.OutputStream;
> import java.util.ArrayList;
> import java.util.List;
>
>
> import org.apache.camel.Exchange;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.impl.JndiRegistry;
> import org.apache.camel.spi.DataFormat;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.junit.Test;
>
>
> public class SetHeaderDemo extends CamelTestSupport {
>
>      @Test
>      public void setHeaderInDataFormatProblem() throws Exception {
>          MockEndpoint mock = getMockEndpoint("mock:result");
>          mock.expectedMessageCount(1);
>          template.sendBodyAndHeader("direct:start", "one\ntwo\nthree\n",
> "DEMO_HEADER", "Hello...");
>
>          // This passes
>          assertExpression(mock.getReceivedExchanges().get(0),
>                  "simple", "${in.header.DEMO_HEADER}", "Hello...");
>
>          // This FAILS... WHY??? *************
>          assertExpression(mock.getReceivedExchanges().get(0),
>              "simple", "${in.header.DISAPPEARING_DEMO_HEADER}", "Will it
> be set?");
>
>          assertMockEndpointsSatisfied();
>      }
>
>
>      @Override
>      protected RouteBuilder createRouteBuilder() throws Exception {
>
>          return new RouteBuilder() {
>              @Override
>              public void configure() {
>                  from("direct:start")
>                  .unmarshal("customFmt") // <== will try to add a new
> header to Exchange IN msg
>
> .to("log://dataformat.demo?showAll=true&multiline=true&level=INFO") //
> <== not there
>                  .to("mock:result");
>              }
>          };
>      }
>
>      @Override
>      protected JndiRegistry createRegistry() throws Exception {
>          JndiRegistry registry = super.createRegistry();
>
>          CustomDataFormat fmt = new CustomDataFormat();
>          registry.bind("customFmt", fmt);
>          return registry;
>      }
> }
>
> /**
>   * Simplest, contrived DataFormat impl to demonstrate that it's impossible
>   * to set a new header in the DataFormat's implementation methods and
>   * see the newly added header down-stream from the marshal/unmarshal
>   * call(s).
>   *
>   */
> class CustomDataFormat implements DataFormat {
>
>      /**
>       * Expects the body to be a newline-delimited list of strings,
>       * which will be unmarshalled to a string array, whose elements
>       * are the "lines" in the "document".
>       *
>       * Obviously the marshal/unmarshal process is not important - the
>       * issue is that if a new header is added in the DataFormat marshal or
>       * unmarshal - it will be GONE after returning.
>       */
>      @Override
>      public Object unmarshal(Exchange exchange, InputStream stream)
>              throws Exception {
>          List<String> result = new ArrayList<String>();
>
>          BufferedReader in
>              = new BufferedReader(new InputStreamReader(stream));
>
>          String line = null;
>          while((line = in.readLine()) != null) {
>              result.add(line);
>          }
>
>          exchange.getIn().setHeader("DISAPPEARING_DEMO_HEADER", "Will it
> be set?");
>          return result;
>      }
>
>      @Override
>      public void marshal(Exchange exchange, Object graph, OutputStream
> stream)
>              throws Exception {
>          throw new UnsupportedOperationException("Not implemented.");
>      }
> }
>
>
>
>
> On 9/19/2013 4:13 AM, Jan Matèrne (jhm) wrote:
>> I tried building my own DF and that works (for me)
>>
>> Jan
>>
>>
>> package org.apache.camel.dataformat;
>>
>> import java.io.File;
>> import java.io.FileInputStream;
>> import java.io.FileNotFoundException;
>> import java.io.FileOutputStream;
>> import java.io.InputStream;
>> import java.io.ObjectInputStream;
>> import java.io.ObjectOutputStream;
>> import java.io.OutputStream;
>> import java.util.HashMap;
>> import java.util.Map;
>>
>> import javax.activation.DataHandler;
>>
>> import org.apache.camel.Exchange;
>> import org.apache.camel.Message;
>> import org.apache.camel.Processor;
>> import org.apache.camel.builder.ExchangeBuilder;
>> import org.apache.camel.builder.RouteBuilder;
>> import org.apache.camel.impl.DefaultMessage;
>> import org.apache.camel.spi.DataFormat;
>> import org.apache.camel.test.junit4.CamelTestSupport;
>> import org.apache.camel.util.FileUtil;
>> import org.apache.commons.io.IOUtils;
>> import org.junit.After;
>> import org.junit.Before;
>> import org.junit.Test;
>>
>> public class SaveHeaderTest extends CamelTestSupport {
>>
>>     private static File dataFile = new File("output/message.ser");
>>
>>     DataFormat customDataFormat = new CustomDataFormat();
>>
>>     @After
>>     @Before
>>     public void cleanup() {
>>         FileUtil.deleteFile(dataFile);
>>     }
>>
>>     @SuppressWarnings("unchecked")
>>     @Test
>>     public void save() throws FileNotFoundException, Exception {
>>         File writtenTo = new File("output/message.ser");
>>         assertFalse(writtenTo.exists());
>>         Exchange exchange = ExchangeBuilder.anExchange(context)
>>                 .withBody("Hello World")
>>                 .withHeader("from", "Apache Camel")
>>                 .withHeader("test", "save")
>>                 .withHeader(Exchange.FILE_NAME,
>> "message.ser")
>>                 .build();
>>         template.send("direct:save", exchange);
>>         assertTrue(writtenTo.exists());
>>
>>         // actual
>>         ObjectInputStream in = new ObjectInputStream(new
>> FileInputStream(dataFile));
>>         Object body = in.readObject();
>>         Map<String,Object> headers = (Map<String, Object>)
>> in.readObject();
>>         Map<String,DataHandler> attachments = (Map<String,
>> DataHandler>) in.readObject();
>>         String messageId = (String) in.readObject();
>>         boolean fault = (boolean) in.readObject();
>>
>>         assertEquals("Hello World", body);
>>         assertEquals("Apache Camel", headers.get("from"));
>>         assertEquals("save", headers.get("test"));
>>         assertTrue(attachments.isEmpty());
>>         assertEquals(exchange.getIn().getMessageId(), messageId);
>>         assertFalse(fault);
>>     }
>>
>>     @Test
>>     public void load() throws Exception {
>>         // create test data
>>         ObjectOutputStream out = new ObjectOutputStream(new
>> FileOutputStream(dataFile));
>>         out.writeObject("Hello World");
>>
>>         Map<String,Object> headers = new HashMap<String, Object>();
>>         headers.put("from", "Apache Camel");
>>         headers.put("test", "load");
>>         out.writeObject(headers);
>>
>>         Map<String,DataHandler> attachments = new
>> HashMap<String,DataHandler>();
>>         out.writeObject(attachments);
>>
>>         out.writeObject("message-id");
>>         out.writeObject(false);
>>         out.close();
>>
>>         // load the saved 'exchange'
>>         Exchange loadCommand = ExchangeBuilder.anExchange(context)
>>                 .withBody(dataFile.getAbsolutePath())
>>                 .build();
>>         Exchange response = template.send("direct:load",
>> loadCommand);
>>         assertEquals("Hello World",
>> response.getOut().getBody(String.class));
>>         assertEquals("Apache Camel",
>> response.getOut().getHeader("from"));
>>         assertEquals("load", response.getOut().getHeader("test"));
>>     }
>>
>>     @Test
>>     public void turnaround() {
>>         Exchange exchange = ExchangeBuilder.anExchange(context)
>>                 .withBody("Hello World")
>>                 .withHeader("from", "Apache Camel")
>>                 .build();
>>         Exchange response = template.send("direct:turnaround",
>> exchange);
>>         assertEquals("Hello World",
>> response.getOut().getBody(String.class));
>>         assertEquals("Apache Camel",
>> response.getOut().getHeader("from"));
>>     }
>>
>>
>>     @Override
>>     protected RouteBuilder createRouteBuilder() throws Exception {
>>         return new RouteBuilder() {
>>             @Override
>>             public void configure() throws Exception {
>>                 from("direct:save")
>>                     .marshal(customDataFormat)
>>                     .to("file:output");
>>                 from("direct:load")
>>                     .process(openFile())
>>                     .unmarshal(customDataFormat);
>>                 from("direct:turnaround")
>>                     .marshal(customDataFormat)
>>                     .unmarshal(customDataFormat);
>>             }
>>
>>             private Processor openFile() {
>>                 return new Processor() {
>>                     @Override
>>                     public void process(Exchange
>> exchange) throws Exception {
>>                         String filename =
>> exchange.getIn().getBody(String.class);
>>                         FileInputStream stream = new
>> FileInputStream(filename);
>>
>> exchange.getIn().setBody(stream);
>>                     }
>>                 };
>>             }
>>         };
>>     }
>>
>>     class CustomDataFormat implements DataFormat {
>>
>>         @Override
>>         public void marshal(Exchange exchange, Object graph,
>> OutputStream stream)
>>                 throws Exception {
>>             ObjectOutputStream out = null;
>>
>>             try {
>>                 out = new ObjectOutputStream(stream);
>>                 // Save the body
>>                 out.writeObject(graph);
>>                 // Save the message
>>                 Message message = exchange.getIn();
>>                 if (message != null) {
>>                     // Message is not serializable, so
>> save the data individually
>>
>> out.writeObject(message.getHeaders());
>>
>> out.writeObject(message.getAttachments());
>>
>> out.writeObject(message.getMessageId());
>>                     out.writeObject(message.isFault());
>>                 }
>>             } finally {
>>                 IOUtils.closeQuietly(out);
>>             }
>>         }
>>
>>         @SuppressWarnings("unchecked")
>>         @Override
>>         public Object unmarshal(Exchange exchange, InputStream
>> stream)
>>                 throws Exception {
>>
>>             ObjectInputStream in = null;
>>             try {
>>                 in = new ObjectInputStream(stream);
>>
>>                 // read the raw data
>>                 Object body = in.readObject();
>>                 Map<String,Object> headers = (Map<String,
>> Object>) in.readObject();
>>                 Map<String,DataHandler> attachments =
>> (Map<String, DataHandler>) in.readObject();
>>                 String messageId = (String) in.readObject();
>>                 boolean fault = (boolean) in.readObject();
>>
>>                 // build the message
>>                 Message msg = new DefaultMessage();
>>                 msg.setBody(body);
>>                 msg.setAttachments(attachments);
>>                 msg.setHeaders(headers);
>>                 msg.setMessageId(messageId);
>>                 msg.setFault(fault);
>>
>>                 return msg;
>>             } finally {
>>                 IOUtils.closeQuietly(in);
>>             }
>>         }
>>     }
>>
>> }
>>
>>> -----Ursprüngliche Nachricht-----
>>> Von: Jan Matèrne (jhm) [mailto:apache@materne.de]
>>> Gesendet: Donnerstag, 19. September 2013 07:06
>>> An: users@camel.apache.org
>>> Betreff: AW: How to set a header in custom DataFormat?
>>>
>>> What I have found is, that the exchange object which is passed to the
>>> DataFormat is only used for getting the CamelContext.
>>> I searched a little bit further and found a processor for
>>> unmarshalling:
>>>
>>> org.apache.camel.processor.UnmarshalProcessor.process(Exchange,
>>> AsyncCallback)
>>>
>>> And there is a note
>>>
>>>    Object result = dataFormat.unmarshal(exchange, stream);
>>>      if (result instanceof Exchange) {
>>>        if (result != exchange) {
>>>          // it's not allowed to return another exchange other than the
>>> one provided to dataFormat
>>>          throw new RuntimeCamelException("The returned exchange " +
>>> result + " is not the same as " + exchange + " provided to the
>>> DataFormat");
>>>
>>>
>>> The logic of that processor is:
>>> - get the object from the stream
>>> - if it is an exchange, throw that exception
>>> - if it is a message, store as 'out' on the exchange-parameter
>>> - if it is something else, store it as body of the 'out' message
>>>
>>>
>>> So have you tried setting the header on the 'out' message?
>>>
>>>
>>> Jan
>>>
>>>
>>>
>>>> -----Ursprüngliche Nachricht-----
>>>> Von: Chris [mailto:cwolf.algo@gmail.com]
>>>> Gesendet: Mittwoch, 18. September 2013 22:58
>>>> An: users@camel.apache.org
>>>> Betreff: How to set a header in custom DataFormat?
>>>>
>>>> I implemented a custom DataFormat and in the "unmarshal(Exchange
>>>> exchange, InputStream stream)" implementation I set a header, but
>>> upon
>>>> attempting to retrieve the header downstream from the "unmarshal"
>>>> call, the header is not there.  Since the pattern is inOnly, I added
>>>> the header to the in Message.  I've done similar with custom
>>>> Processors and in that case, it works.  What is different with
>>>> DataFormat?  Is there any way to set header(s) from DataFormat
>>> marshal/unmarshal?
>>>>
>>>> Thanks,
>>>>
>>>> Chris
>>
>>

Re: AW: How to set a header in custom DataFormat?

Posted by Chris <cw...@gmail.com>.
Jan,

I really appreciate the help, unfortunately your test case does not even 
reproduce the issue I am having.  I think I may not have been clear in 
my original message.

The issue is in a custom DataFormat, if a set a *new* header, that 
header will be gone from the Exchange, for the rest of the route, 
down-stream from the unmarshal (or marshal) call.

So the issue is just setting a header and checking it it's still there. 
So the best way to recreate the problem is to create the most simplest 
DataFormat because marshal/unmarshal is not the issue - the issue is 
setting a new header *inside* the custom marshal/unmarshal methods. So I 
created a test case whose custom DataFormat does the bare minimum - no 
file I/O, no serialization - just string manipulation.

Maybe you and/or other can have a look and explain why there's the issue.

Thanks,

Chris

(the code formatting will be wrecked by the mailing list line length limit)

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;


import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.JndiRegistry;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;


public class SetHeaderDemo extends CamelTestSupport {

     @Test
     public void setHeaderInDataFormatProblem() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
         template.sendBodyAndHeader("direct:start", "one\ntwo\nthree\n", 
"DEMO_HEADER", "Hello...");

         // This passes
         assertExpression(mock.getReceivedExchanges().get(0),
                 "simple", "${in.header.DEMO_HEADER}", "Hello...");

         // This FAILS... WHY??? *************
         assertExpression(mock.getReceivedExchanges().get(0),
             "simple", "${in.header.DISAPPEARING_DEMO_HEADER}", "Will it 
be set?");

         assertMockEndpointsSatisfied();
     }


     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {

         return new RouteBuilder() {
             @Override
             public void configure() {
                 from("direct:start")
                 .unmarshal("customFmt") // <== will try to add a new 
header to Exchange IN msg
 
.to("log://dataformat.demo?showAll=true&multiline=true&level=INFO") // 
<== not there
                 .to("mock:result");
             }
         };
     }

     @Override
     protected JndiRegistry createRegistry() throws Exception {
         JndiRegistry registry = super.createRegistry();

         CustomDataFormat fmt = new CustomDataFormat();
         registry.bind("customFmt", fmt);
         return registry;
     }
}

/**
  * Simplest, contrived DataFormat impl to demonstrate that it's impossible
  * to set a new header in the DataFormat's implementation methods and
  * see the newly added header down-stream from the marshal/unmarshal
  * call(s).
  *
  */
class CustomDataFormat implements DataFormat {

     /**
      * Expects the body to be a newline-delimited list of strings,
      * which will be unmarshalled to a string array, whose elements
      * are the "lines" in the "document".
      *
      * Obviously the marshal/unmarshal process is not important - the
      * issue is that if a new header is added in the DataFormat marshal or
      * unmarshal - it will be GONE after returning.
      */
     @Override
     public Object unmarshal(Exchange exchange, InputStream stream)
             throws Exception {
         List<String> result = new ArrayList<String>();

         BufferedReader in
             = new BufferedReader(new InputStreamReader(stream));

         String line = null;
         while((line = in.readLine()) != null) {
             result.add(line);
         }

         exchange.getIn().setHeader("DISAPPEARING_DEMO_HEADER", "Will it 
be set?");
         return result;
     }

     @Override
     public void marshal(Exchange exchange, Object graph, OutputStream 
stream)
             throws Exception {
         throw new UnsupportedOperationException("Not implemented.");
     }
}




On 9/19/2013 4:13 AM, Jan Matèrne (jhm) wrote:
> I tried building my own DF and that works (for me)
>
> Jan
>
>
> package org.apache.camel.dataformat;
>
> import java.io.File;
> import java.io.FileInputStream;
> import java.io.FileNotFoundException;
> import java.io.FileOutputStream;
> import java.io.InputStream;
> import java.io.ObjectInputStream;
> import java.io.ObjectOutputStream;
> import java.io.OutputStream;
> import java.util.HashMap;
> import java.util.Map;
>
> import javax.activation.DataHandler;
>
> import org.apache.camel.Exchange;
> import org.apache.camel.Message;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.ExchangeBuilder;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultMessage;
> import org.apache.camel.spi.DataFormat;
> import org.apache.camel.test.junit4.CamelTestSupport;
> import org.apache.camel.util.FileUtil;
> import org.apache.commons.io.IOUtils;
> import org.junit.After;
> import org.junit.Before;
> import org.junit.Test;
>
> public class SaveHeaderTest extends CamelTestSupport {
> 	
> 	private static File dataFile = new File("output/message.ser");
> 	
> 	DataFormat customDataFormat = new CustomDataFormat();
> 	
> 	@After
> 	@Before
> 	public void cleanup() {
> 		FileUtil.deleteFile(dataFile);
> 	}
> 	
> 	@SuppressWarnings("unchecked")
> 	@Test
> 	public void save() throws FileNotFoundException, Exception {
> 		File writtenTo = new File("output/message.ser");
> 		assertFalse(writtenTo.exists());
> 		Exchange exchange = ExchangeBuilder.anExchange(context)
> 				.withBody("Hello World")
> 				.withHeader("from", "Apache Camel")
> 				.withHeader("test", "save")
> 				.withHeader(Exchange.FILE_NAME,
> "message.ser")
> 				.build();
> 		template.send("direct:save", exchange);
> 		assertTrue(writtenTo.exists());
>
> 		// actual
> 		ObjectInputStream in = new ObjectInputStream(new
> FileInputStream(dataFile));
> 		Object body = in.readObject();
> 		Map<String,Object> headers = (Map<String, Object>)
> in.readObject();
> 		Map<String,DataHandler> attachments = (Map<String,
> DataHandler>) in.readObject();
> 		String messageId = (String) in.readObject();
> 		boolean fault = (boolean) in.readObject();
> 		
> 		assertEquals("Hello World", body);
> 		assertEquals("Apache Camel", headers.get("from"));
> 		assertEquals("save", headers.get("test"));
> 		assertTrue(attachments.isEmpty());
> 		assertEquals(exchange.getIn().getMessageId(), messageId);
> 		assertFalse(fault);
> 	}
> 	
> 	@Test
> 	public void load() throws Exception {
> 		// create test data
> 		ObjectOutputStream out = new ObjectOutputStream(new
> FileOutputStream(dataFile));
> 		out.writeObject("Hello World");
> 		
> 		Map<String,Object> headers = new HashMap<String, Object>();
> 		headers.put("from", "Apache Camel");
> 		headers.put("test", "load");
> 		out.writeObject(headers);
> 		
> 		Map<String,DataHandler> attachments = new
> HashMap<String,DataHandler>();
> 		out.writeObject(attachments);
> 		
> 		out.writeObject("message-id");
> 		out.writeObject(false);
> 		out.close();
> 		
> 		// load the saved 'exchange'
> 		Exchange loadCommand = ExchangeBuilder.anExchange(context)
> 				.withBody(dataFile.getAbsolutePath())
> 				.build();
> 		Exchange response = template.send("direct:load",
> loadCommand);
> 		assertEquals("Hello World",
> response.getOut().getBody(String.class));
> 		assertEquals("Apache Camel",
> response.getOut().getHeader("from"));
> 		assertEquals("load", response.getOut().getHeader("test"));
> 	}
> 	
> 	@Test
> 	public void turnaround() {
> 		Exchange exchange = ExchangeBuilder.anExchange(context)
> 				.withBody("Hello World")
> 				.withHeader("from", "Apache Camel")
> 				.build();
> 		Exchange response = template.send("direct:turnaround",
> exchange);
> 		assertEquals("Hello World",
> response.getOut().getBody(String.class));
> 		assertEquals("Apache Camel",
> response.getOut().getHeader("from"));
> 	}
> 	
> 	
> 	@Override
> 	protected RouteBuilder createRouteBuilder() throws Exception {
> 		return new RouteBuilder() {
> 			@Override
> 			public void configure() throws Exception {
> 				from("direct:save")
> 					.marshal(customDataFormat)
> 					.to("file:output");
> 				from("direct:load")
> 					.process(openFile())
> 					.unmarshal(customDataFormat);
> 				from("direct:turnaround")
> 					.marshal(customDataFormat)
> 					.unmarshal(customDataFormat);
> 			}
>
> 			private Processor openFile() {
> 				return new Processor() {
> 					@Override
> 					public void process(Exchange
> exchange) throws Exception {
> 						String filename =
> exchange.getIn().getBody(String.class);
> 						FileInputStream stream = new
> FileInputStream(filename);
> 	
> exchange.getIn().setBody(stream);
> 					}
> 				};
> 			}
> 		};
> 	}
> 	
> 	class CustomDataFormat implements DataFormat {
> 		
> 		@Override
> 		public void marshal(Exchange exchange, Object graph,
> OutputStream stream)
> 				throws Exception {
> 			ObjectOutputStream out = null;
> 			
> 			try {
> 				out = new ObjectOutputStream(stream);
> 				// Save the body
> 				out.writeObject(graph);
> 				// Save the message
> 				Message message = exchange.getIn();
> 				if (message != null) {
> 					// Message is not serializable, so
> save the data individually
> 	
> out.writeObject(message.getHeaders());
> 	
> out.writeObject(message.getAttachments());
> 	
> out.writeObject(message.getMessageId());
> 					out.writeObject(message.isFault());
> 				}
> 			} finally {
> 				IOUtils.closeQuietly(out);
> 			}
> 		}
>
> 		@SuppressWarnings("unchecked")
> 		@Override
> 		public Object unmarshal(Exchange exchange, InputStream
> stream)
> 				throws Exception {
> 			
> 			ObjectInputStream in = null;
> 			try {
> 				in = new ObjectInputStream(stream);
> 				
> 				// read the raw data
> 				Object body = in.readObject();
> 				Map<String,Object> headers = (Map<String,
> Object>) in.readObject();
> 				Map<String,DataHandler> attachments =
> (Map<String, DataHandler>) in.readObject();
> 				String messageId = (String) in.readObject();
> 				boolean fault = (boolean) in.readObject();
> 				
> 				// build the message
> 				Message msg = new DefaultMessage();
> 				msg.setBody(body);
> 				msg.setAttachments(attachments);
> 				msg.setHeaders(headers);
> 				msg.setMessageId(messageId);
> 				msg.setFault(fault);
> 				
> 				return msg;
> 			} finally {
> 				IOUtils.closeQuietly(in);
> 			}
> 		}
> 	}
> 	
> }
>
>> -----Ursprüngliche Nachricht-----
>> Von: Jan Matèrne (jhm) [mailto:apache@materne.de]
>> Gesendet: Donnerstag, 19. September 2013 07:06
>> An: users@camel.apache.org
>> Betreff: AW: How to set a header in custom DataFormat?
>>
>> What I have found is, that the exchange object which is passed to the
>> DataFormat is only used for getting the CamelContext.
>> I searched a little bit further and found a processor for
>> unmarshalling:
>>
>> org.apache.camel.processor.UnmarshalProcessor.process(Exchange,
>> AsyncCallback)
>>
>> And there is a note
>>
>>    Object result = dataFormat.unmarshal(exchange, stream);
>>      if (result instanceof Exchange) {
>>        if (result != exchange) {
>>          // it's not allowed to return another exchange other than the
>> one provided to dataFormat
>>          throw new RuntimeCamelException("The returned exchange " +
>> result + " is not the same as " + exchange + " provided to the
>> DataFormat");
>>
>>
>> The logic of that processor is:
>> - get the object from the stream
>> - if it is an exchange, throw that exception
>> - if it is a message, store as 'out' on the exchange-parameter
>> - if it is something else, store it as body of the 'out' message
>>
>>
>> So have you tried setting the header on the 'out' message?
>>
>>
>> Jan
>>
>>
>>
>>> -----Ursprüngliche Nachricht-----
>>> Von: Chris [mailto:cwolf.algo@gmail.com]
>>> Gesendet: Mittwoch, 18. September 2013 22:58
>>> An: users@camel.apache.org
>>> Betreff: How to set a header in custom DataFormat?
>>>
>>> I implemented a custom DataFormat and in the "unmarshal(Exchange
>>> exchange, InputStream stream)" implementation I set a header, but
>> upon
>>> attempting to retrieve the header downstream from the "unmarshal"
>>> call, the header is not there.  Since the pattern is inOnly, I added
>>> the header to the in Message.  I've done similar with custom
>>> Processors and in that case, it works.  What is different with
>>> DataFormat?  Is there any way to set header(s) from DataFormat
>> marshal/unmarshal?
>>>
>>> Thanks,
>>>
>>> Chris
>
>

AW: How to set a header in custom DataFormat?

Posted by "Jan Matèrne (jhm)" <ap...@materne.de>.
I tried building my own DF and that works (for me)

Jan


package org.apache.camel.dataformat;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;

import javax.activation.DataHandler;

import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.ExchangeBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultMessage;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.apache.camel.util.FileUtil;
import org.apache.commons.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class SaveHeaderTest extends CamelTestSupport {
	
	private static File dataFile = new File("output/message.ser");
	
	DataFormat customDataFormat = new CustomDataFormat();
	
	@After
	@Before
	public void cleanup() {
		FileUtil.deleteFile(dataFile);
	}
	
	@SuppressWarnings("unchecked")
	@Test
	public void save() throws FileNotFoundException, Exception {
		File writtenTo = new File("output/message.ser");
		assertFalse(writtenTo.exists());
		Exchange exchange = ExchangeBuilder.anExchange(context)
				.withBody("Hello World")
				.withHeader("from", "Apache Camel")
				.withHeader("test", "save")
				.withHeader(Exchange.FILE_NAME,
"message.ser")
				.build();
		template.send("direct:save", exchange);
		assertTrue(writtenTo.exists());

		// actual
		ObjectInputStream in = new ObjectInputStream(new
FileInputStream(dataFile));
		Object body = in.readObject();
		Map<String,Object> headers = (Map<String, Object>)
in.readObject();
		Map<String,DataHandler> attachments = (Map<String,
DataHandler>) in.readObject();
		String messageId = (String) in.readObject();
		boolean fault = (boolean) in.readObject();
		
		assertEquals("Hello World", body);
		assertEquals("Apache Camel", headers.get("from"));
		assertEquals("save", headers.get("test"));
		assertTrue(attachments.isEmpty());
		assertEquals(exchange.getIn().getMessageId(), messageId);
		assertFalse(fault);
	}
	
	@Test
	public void load() throws Exception {
		// create test data
		ObjectOutputStream out = new ObjectOutputStream(new
FileOutputStream(dataFile));
		out.writeObject("Hello World");
		
		Map<String,Object> headers = new HashMap<String, Object>();
		headers.put("from", "Apache Camel");
		headers.put("test", "load");
		out.writeObject(headers);
		
		Map<String,DataHandler> attachments = new
HashMap<String,DataHandler>();
		out.writeObject(attachments);
		
		out.writeObject("message-id");
		out.writeObject(false);
		out.close();
		
		// load the saved 'exchange'
		Exchange loadCommand = ExchangeBuilder.anExchange(context)
				.withBody(dataFile.getAbsolutePath())
				.build();
		Exchange response = template.send("direct:load",
loadCommand);
		assertEquals("Hello World",
response.getOut().getBody(String.class));
		assertEquals("Apache Camel",
response.getOut().getHeader("from"));
		assertEquals("load", response.getOut().getHeader("test"));
	}
	
	@Test
	public void turnaround() {
		Exchange exchange = ExchangeBuilder.anExchange(context)
				.withBody("Hello World")
				.withHeader("from", "Apache Camel")
				.build();
		Exchange response = template.send("direct:turnaround",
exchange);
		assertEquals("Hello World",
response.getOut().getBody(String.class));
		assertEquals("Apache Camel",
response.getOut().getHeader("from"));
	}
	
	
	@Override
	protected RouteBuilder createRouteBuilder() throws Exception {
		return new RouteBuilder() {
			@Override
			public void configure() throws Exception {
				from("direct:save")
					.marshal(customDataFormat)
					.to("file:output");
				from("direct:load")
					.process(openFile())
					.unmarshal(customDataFormat);
				from("direct:turnaround")
					.marshal(customDataFormat)
					.unmarshal(customDataFormat);
			}

			private Processor openFile() {
				return new Processor() {
					@Override
					public void process(Exchange
exchange) throws Exception {
						String filename =
exchange.getIn().getBody(String.class);
						FileInputStream stream = new
FileInputStream(filename);
	
exchange.getIn().setBody(stream);
					}
				};
			}
		};
	}
	
	class CustomDataFormat implements DataFormat {
		
		@Override
		public void marshal(Exchange exchange, Object graph,
OutputStream stream)
				throws Exception {
			ObjectOutputStream out = null;
			
			try {
				out = new ObjectOutputStream(stream);
				// Save the body
				out.writeObject(graph);
				// Save the message
				Message message = exchange.getIn();
				if (message != null) {
					// Message is not serializable, so
save the data individually
	
out.writeObject(message.getHeaders());
	
out.writeObject(message.getAttachments());
	
out.writeObject(message.getMessageId());
					out.writeObject(message.isFault());
				}
			} finally {
				IOUtils.closeQuietly(out);
			}
		}

		@SuppressWarnings("unchecked")
		@Override
		public Object unmarshal(Exchange exchange, InputStream
stream)
				throws Exception {
			
			ObjectInputStream in = null;
			try {
				in = new ObjectInputStream(stream);
				
				// read the raw data
				Object body = in.readObject();
				Map<String,Object> headers = (Map<String,
Object>) in.readObject();
				Map<String,DataHandler> attachments =
(Map<String, DataHandler>) in.readObject();
				String messageId = (String) in.readObject();
				boolean fault = (boolean) in.readObject();
				
				// build the message
				Message msg = new DefaultMessage();
				msg.setBody(body);
				msg.setAttachments(attachments);
				msg.setHeaders(headers);
				msg.setMessageId(messageId);
				msg.setFault(fault);
				
				return msg;
			} finally {
				IOUtils.closeQuietly(in);
			}
		}
	}
	
}

> -----Ursprüngliche Nachricht-----
> Von: Jan Matèrne (jhm) [mailto:apache@materne.de]
> Gesendet: Donnerstag, 19. September 2013 07:06
> An: users@camel.apache.org
> Betreff: AW: How to set a header in custom DataFormat?
> 
> What I have found is, that the exchange object which is passed to the
> DataFormat is only used for getting the CamelContext.
> I searched a little bit further and found a processor for
> unmarshalling:
> 
> org.apache.camel.processor.UnmarshalProcessor.process(Exchange,
> AsyncCallback)
> 
> And there is a note
> 
>   Object result = dataFormat.unmarshal(exchange, stream);
>     if (result instanceof Exchange) {
>       if (result != exchange) {
>         // it's not allowed to return another exchange other than the
> one provided to dataFormat
>         throw new RuntimeCamelException("The returned exchange " +
> result + " is not the same as " + exchange + " provided to the
> DataFormat");
> 
> 
> The logic of that processor is:
> - get the object from the stream
> - if it is an exchange, throw that exception
> - if it is a message, store as 'out' on the exchange-parameter
> - if it is something else, store it as body of the 'out' message
> 
> 
> So have you tried setting the header on the 'out' message?
> 
> 
> Jan
> 
> 
> 
> > -----Ursprüngliche Nachricht-----
> > Von: Chris [mailto:cwolf.algo@gmail.com]
> > Gesendet: Mittwoch, 18. September 2013 22:58
> > An: users@camel.apache.org
> > Betreff: How to set a header in custom DataFormat?
> >
> > I implemented a custom DataFormat and in the "unmarshal(Exchange
> > exchange, InputStream stream)" implementation I set a header, but
> upon
> > attempting to retrieve the header downstream from the "unmarshal"
> > call, the header is not there.  Since the pattern is inOnly, I added
> > the header to the in Message.  I've done similar with custom
> > Processors and in that case, it works.  What is different with
> > DataFormat?  Is there any way to set header(s) from DataFormat
> marshal/unmarshal?
> >
> > Thanks,
> >
> > Chris



AW: How to set a header in custom DataFormat?

Posted by "Jan Matèrne (jhm)" <ap...@materne.de>.
What I have found is, that the exchange object which is passed to the
DataFormat is only used for getting the CamelContext.
I searched a little bit further and found a processor for unmarshalling:

org.apache.camel.processor.UnmarshalProcessor.process(Exchange,
AsyncCallback)

And there is a note

  Object result = dataFormat.unmarshal(exchange, stream);
    if (result instanceof Exchange) {
      if (result != exchange) {
        // it's not allowed to return another exchange other than the one
provided to dataFormat
        throw new RuntimeCamelException("The returned exchange " + result +
" is not the same as " + exchange + " provided to the DataFormat");


The logic of that processor is:
- get the object from the stream
- if it is an exchange, throw that exception
- if it is a message, store as 'out' on the exchange-parameter
- if it is something else, store it as body of the 'out' message


So have you tried setting the header on the 'out' message?


Jan



> -----Ursprüngliche Nachricht-----
> Von: Chris [mailto:cwolf.algo@gmail.com]
> Gesendet: Mittwoch, 18. September 2013 22:58
> An: users@camel.apache.org
> Betreff: How to set a header in custom DataFormat?
> 
> I implemented a custom DataFormat and in the "unmarshal(Exchange
> exchange, InputStream stream)" implementation I set a header, but upon
> attempting to retrieve the header downstream from the "unmarshal" call,
> the header is not there.  Since the pattern is inOnly, I added the
> header to the in Message.  I've done similar with custom Processors and
> in that case, it works.  What is different with DataFormat?  Is there
> any way to set header(s) from DataFormat marshal/unmarshal?
> 
> Thanks,
> 
> Chris