You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2013/04/05 15:09:04 UTC
svn commit: r1464963 - in /camel/trunk/components/camel-krati/src:
main/java/org/apache/camel/component/krati/
main/java/org/apache/camel/component/krati/processor/idempotent/
main/java/org/apache/camel/component/krati/serializer/
test/java/org/apache/...
Author: davsclaus
Date: Fri Apr 5 13:09:04 2013
New Revision: 1464963
URL: http://svn.apache.org/r1464963
Log:
CAMEL-6242: Krati component - Should preserve headers
Modified:
camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiDataStoreRegistration.java
camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiHelper.java
camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiProducer.java
camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/processor/idempotent/KratiIdempotentRepository.java
camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/serializer/KratiDefaultSerializer.java
camel/trunk/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java
Modified: camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java?rev=1464963&r1=1464962&r2=1464963&view=diff
==============================================================================
--- camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java (original)
+++ camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java Fri Apr 5 13:09:04 2013
@@ -40,7 +40,6 @@ public class KratiConsumer extends Sched
protected final KratiEndpoint endpoint;
protected DataStore<Object, Object> dataStore;
- protected int maxMessagesPerPoll = 10;
public KratiConsumer(KratiEndpoint endpoint, Processor processor, DataStore<Object, Object> dataStore) {
super(endpoint, processor);
@@ -88,12 +87,12 @@ public class KratiConsumer extends Sched
try {
dataStore.delete(exchange.getProperty(KratiConstants.KEY));
} catch (Exception e) {
- LOG.warn("Failed to remove from datastore.", e);
+ LOG.warn("Failed to remove from datastore. This exception is ignored.", e);
}
}
public void onFailure(Exchange exchange) {
- //emtpy
+ // noop
}
});
Modified: camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiDataStoreRegistration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiDataStoreRegistration.java?rev=1464963&r1=1464962&r2=1464963&view=diff
==============================================================================
--- camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiDataStoreRegistration.java (original)
+++ camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiDataStoreRegistration.java Fri Apr 5 13:09:04 2013
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.camel.component.krati;
import java.io.IOException;
@@ -42,7 +41,7 @@ public class KratiDataStoreRegistration
try {
dataStore.close();
} catch (IOException e) {
- LOG.warn("Error while closing datastore.", e);
+ LOG.warn("Error while closing datastore. This exception is ignored.", e);
}
return true;
} else {
Modified: camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiHelper.java?rev=1464963&r1=1464962&r2=1464963&view=diff
==============================================================================
--- camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiHelper.java (original)
+++ camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiHelper.java Fri Apr 5 13:09:04 2013
@@ -82,7 +82,7 @@ public final class KratiHelper {
try {
result = new DynamicDataSet(homeDir, initialCapacity, segmentFactory);
} catch (Exception e) {
- throw new RuntimeCamelException("Failed to create Krati DataSet.", e);
+ throw new RuntimeCamelException("Failed to create Krati DataSet. This exception is ignored.", e);
}
return result;
}
Modified: camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiProducer.java?rev=1464963&r1=1464962&r2=1464963&view=diff
==============================================================================
--- camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiProducer.java (original)
+++ camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiProducer.java Fri Apr 5 13:09:04 2013
@@ -28,8 +28,8 @@ import org.slf4j.LoggerFactory;
public class KratiProducer extends DefaultProducer {
private static final transient Logger LOG = LoggerFactory.getLogger(KratiProducer.class);
- protected KratiEndpoint endpoint;
- protected DataStore<Object, Object> dataStore;
+ protected final KratiEndpoint endpoint;
+ protected final DataStore<Object, Object> dataStore;
public KratiProducer(KratiEndpoint endpoint, DataStore<Object, Object> dataStore) {
super(endpoint);
@@ -43,6 +43,9 @@ public class KratiProducer extends Defau
LOG.trace("Processing {} operation on '[{}]'", operation, exchange);
if (KratiConstants.KRATI_OPERATION_GET.equals(operation) && key != null) {
+ // preserve headers and attachments
+ exchange.getOut().setHeaders(exchange.getIn().getHeaders());
+ exchange.getOut().setAttachments(exchange.getIn().getAttachments());
exchange.getOut().setBody(dataStore.get(key));
} else if (KratiConstants.KRATI_OPERATION_DELETE.equals(operation) && key != null) {
boolean status;
@@ -51,16 +54,23 @@ public class KratiProducer extends Defau
dataStore.persist();
}
if (status) {
+ exchange.getOut().setHeaders(exchange.getIn().getHeaders());
+ exchange.getOut().setAttachments(exchange.getIn().getAttachments());
exchange.getOut().setHeader(KratiConstants.KRATI_OPERATION_STATUS, KratiConstants.KRATI_OPERATION_SUCESSFUL);
} else {
+ exchange.getOut().setHeaders(exchange.getIn().getHeaders());
+ exchange.getOut().setAttachments(exchange.getIn().getAttachments());
exchange.getOut().setHeader(KratiConstants.KRATI_OPERATION_STATUS, KratiConstants.KRATI_OPERATION_FAILURE);
}
} else if (KratiConstants.KRATI_OPERATION_DELETEALL.equals(operation)) {
try {
dataStore.clear();
+ exchange.getOut().setHeaders(exchange.getIn().getHeaders());
+ exchange.getOut().setAttachments(exchange.getIn().getAttachments());
exchange.getOut().setHeader(KratiConstants.KRATI_OPERATION_STATUS, KratiConstants.KRATI_OPERATION_SUCESSFUL);
} catch (Exception e) {
LOG.warn("Error clearing all entries from store", e);
+ // This is not so good to ignore exceptions, the end user have not access the exception, and cannot use Camel error handling
exchange.getOut().setHeader(KratiConstants.KRATI_OPERATION_STATUS, KratiConstants.KRATI_OPERATION_FAILURE);
}
} else {
@@ -74,9 +84,6 @@ public class KratiProducer extends Defau
/**
* Retrieves the operation from the URI or from the exchange headers. The header will take precedence over the URI.
- *
- * @param exchange
- * @return
*/
public String getOperation(Exchange exchange) {
String operation = ((KratiEndpoint) getEndpoint()).getOperation();
@@ -90,9 +97,6 @@ public class KratiProducer extends Defau
/**
* Retrieves the key from the URI or from the exchange headers. The header will take precedence over the URI.
- *
- * @param exchange
- * @return
*/
public Object getKey(Exchange exchange) {
Object key = ((KratiEndpoint) getEndpoint()).getKey();
@@ -105,9 +109,6 @@ public class KratiProducer extends Defau
/**
* Retrieves the value from the URI or from the exchange headers/body. The header/body will take precedence over the URI.
- *
- * @param exchange
- * @return
*/
public Object getValue(Exchange exchange) {
Object value = ((KratiEndpoint) getEndpoint()).getValue();
Modified: camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/processor/idempotent/KratiIdempotentRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/processor/idempotent/KratiIdempotentRepository.java?rev=1464963&r1=1464962&r2=1464963&view=diff
==============================================================================
--- camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/processor/idempotent/KratiIdempotentRepository.java (original)
+++ camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/processor/idempotent/KratiIdempotentRepository.java Fri Apr 5 13:09:04 2013
@@ -57,7 +57,7 @@ public class KratiIdempotentRepository e
}
} catch (Exception e) {
- LOG.warn("Error adding item to krati idempotent repository. This exception is ignored.", e);
+ LOG.warn("Error adding item to Krati idempotent repository. This exception is ignored.", e);
return false;
}
}
@@ -69,7 +69,7 @@ public class KratiIdempotentRepository e
try {
return dataSet.has(bytes);
} catch (Exception e) {
- LOG.warn("Error checking item exists in krati idempotent repository. This exception is ignored.", e);
+ LOG.warn("Error checking item exists in Krati idempotent repository. This exception is ignored.", e);
return false;
}
}
@@ -81,7 +81,7 @@ public class KratiIdempotentRepository e
try {
return dataSet.delete(bytes);
} catch (Exception e) {
- LOG.warn("Error removing item from krati idempotent repository. This exception is ignored.", e);
+ LOG.warn("Error removing item from Krati idempotent repository. This exception is ignored.", e);
return false;
}
}
Modified: camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/serializer/KratiDefaultSerializer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/serializer/KratiDefaultSerializer.java?rev=1464963&r1=1464962&r2=1464963&view=diff
==============================================================================
--- camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/serializer/KratiDefaultSerializer.java (original)
+++ camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/serializer/KratiDefaultSerializer.java Fri Apr 5 13:09:04 2013
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.camel.component.krati.serializer;
import java.io.ByteArrayInputStream;
@@ -77,6 +76,7 @@ public class KratiDefaultSerializer<T ex
return null;
}
+ // TODO: should use Camel's ClassResolver for classloading
ByteArrayInputStream bais = new ByteArrayInputStream(binary);
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
try {
Modified: camel/trunk/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java?rev=1464963&r1=1464962&r2=1464963&view=diff
==============================================================================
--- camel/trunk/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java (original)
+++ camel/trunk/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java Fri Apr 5 13:09:04 2013
@@ -17,6 +17,8 @@
package org.apache.camel.component.krati;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
@@ -48,6 +50,25 @@ public class KratiProducerTest extends C
}
@Test
+ public void testPutAndGetPreserveHeaders() throws InterruptedException {
+ ProducerTemplate template = context.createProducerTemplate();
+ template.sendBodyAndHeader("direct:put", new ValueObject("TEST1"), KratiConstants.KEY, new KeyObject("1"));
+ template.sendBodyAndHeader("direct:put", new ValueObject("TEST2"), KratiConstants.KEY, new KeyObject("2"));
+ template.sendBodyAndHeader("direct:put", new ValueObject("TEST3"), KratiConstants.KEY, new KeyObject("3"));
+
+ Exchange out = template.send("direct:get", new Processor() {
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ exchange.getIn().setHeader("foo", 123);
+ exchange.getIn().setHeader(KratiConstants.KEY, new KeyObject("3"));
+ }
+ });
+ assertNotNull(out);
+ assertEquals(123, out.getOut().getHeader("foo"));
+ assertEquals(new ValueObject("TEST3"), out.getOut().getBody());
+ }
+
+ @Test
public void testPutDeleteAndGet() throws InterruptedException {
ProducerTemplate template = context.createProducerTemplate();
template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY, "1");