You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ch...@apache.org on 2007/08/15 17:45:22 UTC

svn commit: r566228 - in /activemq/camel/trunk/components/camel-activemq/src: main/java/org/apache/camel/component/activemq/JournalEndpoint.java test/java/org/apache/camel/component/activemq/JournalConfigureTest.java

Author: chirino
Date: Wed Aug 15 08:45:21 2007
New Revision: 566228

URL: http://svn.apache.org/viewvc?view=rev&rev=566228
Log:
Adding a configuration test for the activemq.journal endpoint.

Added:
    activemq/camel/trunk/components/camel-activemq/src/test/java/org/apache/camel/component/activemq/JournalConfigureTest.java   (with props)
Modified:
    activemq/camel/trunk/components/camel-activemq/src/main/java/org/apache/camel/component/activemq/JournalEndpoint.java

Modified: activemq/camel/trunk/components/camel-activemq/src/main/java/org/apache/camel/component/activemq/JournalEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-activemq/src/main/java/org/apache/camel/component/activemq/JournalEndpoint.java?view=diff&rev=566228&r1=566227&r2=566228
==============================================================================
--- activemq/camel/trunk/components/camel-activemq/src/main/java/org/apache/camel/component/activemq/JournalEndpoint.java (original)
+++ activemq/camel/trunk/components/camel-activemq/src/main/java/org/apache/camel/component/activemq/JournalEndpoint.java Wed Aug 15 08:45:21 2007
@@ -26,7 +26,6 @@
 import org.apache.activemq.util.ByteSequence;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Consumer;
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -38,63 +37,27 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-public class JournalEndpoint extends DefaultEndpoint<DefaultExchange> {
+public class JournalEndpoint extends DefaultEndpoint<Exchange> {
 
     private static final transient Log LOG = LogFactory.getLog(JournalEndpoint.class);
 
-    public final class JournalProducer extends DefaultProducer<DefaultExchange> {
-        private boolean syncWrite = JournalEndpoint.this.syncWrite;
-
-        public JournalProducer(Endpoint<DefaultExchange> endpoint) {
-            super(endpoint);
-        }
-
-        public void process(Exchange exchange) throws Exception {
-            incrementReference();
-            try {
-
-                ByteSequence body = exchange.getIn().getBody(ByteSequence.class);
-                if (body == null) {
-                    byte[] bytes = exchange.getIn().getBody(byte[].class);
-                    if (bytes != null) {
-                        body = new ByteSequence(bytes);
-                    }
-                }
-                if (body == null) {
-                    throw new CamelExchangeException("In body message could not be converted to a ByteSequence or a byte array.", exchange);
-                }
-                dataManager.write(body, true);
-
-            } finally {
-                decrementReference();
-            }
-        }
-
-        public boolean isSyncWrite() {
-            return syncWrite;
-        }
-
-        public void setSyncWrite(boolean syncWrite) {
-            this.syncWrite = syncWrite;
-        }
-    }
-
     private final File directory;
-    private final AtomicReference<DefaultConsumer<DefaultExchange>> consumer = new AtomicReference<DefaultConsumer<DefaultExchange>>();
+    private final AtomicReference<DefaultConsumer<Exchange>> consumer = new AtomicReference<DefaultConsumer<Exchange>>();
     private final Object activationMutex = new Object();
     private int referenceCount;
     private AsyncDataManager dataManager;
     private Thread thread;
     private Location lastReadLocation;
-    private boolean syncWrite = true;
     private long idleDelay = 1000;
+    private boolean syncProduce = true;
+    private boolean syncConsume;
 
     public JournalEndpoint(String uri, JournalComponent journalComponent, File directory) {
         super(uri, journalComponent.getCamelContext());
         this.directory = directory;
     }
 
-    public DefaultExchange createExchange() {
+    public Exchange createExchange() {
         return new DefaultExchange(getContext());
     }
 
@@ -106,8 +69,8 @@
         return directory;
     }
 
-    public Consumer<DefaultExchange> createConsumer(Processor processor) throws Exception {
-        return new DefaultConsumer<DefaultExchange>(this, processor) {
+    public Consumer<Exchange> createConsumer(Processor processor) throws Exception {
+        return new DefaultConsumer<Exchange>(this, processor) {
             @Override
             public void start() throws Exception {
                 super.start();
@@ -149,7 +112,7 @@
         }
     }
 
-    protected void deactivateConsumer(DefaultConsumer<DefaultExchange> consumer) throws IOException {
+    protected void deactivateConsumer(DefaultConsumer<Exchange> consumer) throws IOException {
         synchronized (activationMutex) {
             if (this.consumer.get() != consumer) {
                 throw new RuntimeCamelException("Consumer was not active.");
@@ -164,7 +127,7 @@
         }
     }
 
-    protected void activateConsumer(DefaultConsumer<DefaultExchange> consumer) throws IOException {
+    protected void activateConsumer(DefaultConsumer<Exchange> consumer) throws IOException {
         synchronized (activationMutex) {
             if (this.consumer.get() != null) {
                 throw new RuntimeCamelException("Consumer already active: journal endpoints only support 1 active consumer");
@@ -185,7 +148,7 @@
 
     protected void dispatchToConsumer() {
         try {
-            DefaultConsumer<DefaultExchange> consumer;
+            DefaultConsumer<Exchange> consumer;
             while ((consumer = this.consumer.get()) != null) {
                 // See if there is a new record to process
                 Location location = dataManager.getNextLocation(lastReadLocation);
@@ -193,7 +156,7 @@
 
                     // Send it on.
                     ByteSequence read = dataManager.read(location);
-                    DefaultExchange exchange = createExchange();
+                    Exchange exchange = createExchange();
                     exchange.getIn().setBody(read);
                     exchange.getIn().setHeader("journal", getEndpointUri());
                     exchange.getIn().setHeader("location", location);
@@ -205,7 +168,7 @@
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Consumed record at: " + location);
                     }
-                    dataManager.setMark(location, true);
+                    dataManager.setMark(location, syncConsume);
                     lastReadLocation = location;
                 } else {
                     // Avoid a tight CPU loop if there is no new record to read.
@@ -218,16 +181,45 @@
         }
     }
 
-    public Producer<DefaultExchange> createProducer() throws Exception {
-        return new JournalProducer(this);
+    public Producer<Exchange> createProducer() throws Exception {
+        return new DefaultProducer<Exchange>(this) {
+            public void process(Exchange exchange) throws Exception {
+                incrementReference();
+                try {
+
+                    ByteSequence body = exchange.getIn().getBody(ByteSequence.class);
+                    if (body == null) {
+                        byte[] bytes = exchange.getIn().getBody(byte[].class);
+                        if (bytes != null) {
+                            body = new ByteSequence(bytes);
+                        }
+                    }
+                    if (body == null) {
+                        throw new CamelExchangeException("In body message could not be converted to a ByteSequence or a byte array.", exchange);
+                    }
+                    dataManager.write(body, syncProduce);
+
+                } finally {
+                    decrementReference();
+                }
+            }
+        };
+    }
+
+    public boolean isSyncConsume() {
+        return syncConsume;
+    }
+
+    public void setSyncConsume(boolean syncConsume) {
+        this.syncConsume = syncConsume;
     }
 
-    public boolean isSyncWrite() {
-        return syncWrite;
+    public boolean isSyncProduce() {
+        return syncProduce;
     }
 
-    public void setSyncWrite(boolean syncWrite) {
-        this.syncWrite = syncWrite;
+    public void setSyncProduce(boolean syncProduce) {
+        this.syncProduce = syncProduce;
     }
 
 }

Added: activemq/camel/trunk/components/camel-activemq/src/test/java/org/apache/camel/component/activemq/JournalConfigureTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-activemq/src/test/java/org/apache/camel/component/activemq/JournalConfigureTest.java?view=auto&rev=566228
==============================================================================
--- activemq/camel/trunk/components/camel-activemq/src/test/java/org/apache/camel/component/activemq/JournalConfigureTest.java (added)
+++ activemq/camel/trunk/components/camel-activemq/src/test/java/org/apache/camel/component/activemq/JournalConfigureTest.java Wed Aug 15 08:45:21 2007
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.activemq;
+
+import java.io.File;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Endpoint;
+
+/**
+ * @version $Revision$
+ */
+public class JournalConfigureTest extends ContextTestSupport {
+    
+    public void testDefaltConfig() throws Exception {
+        JournalEndpoint endpoint = resolveMandatoryEndpoint("activemq.journal:test");        
+        assertEquals("directory", new File("test"), endpoint.getDirectory());
+        assertEquals("syncConsume", false, endpoint.isSyncConsume());
+        assertEquals("syncProduce", true, endpoint.isSyncProduce());
+    }
+
+    public void testConfigViaOptions() throws Exception {
+        JournalEndpoint endpoint = resolveMandatoryEndpoint("activemq.journal:test?syncConsume=true&syncProduce=false");        
+        assertEquals("directory", new File("test"), endpoint.getDirectory());
+        assertEquals("syncConsume", true, endpoint.isSyncConsume());
+        assertEquals("syncProduce", false, endpoint.isSyncProduce());
+    }
+
+    @Override
+    protected JournalEndpoint resolveMandatoryEndpoint(String uri) {
+        Endpoint endpoint = super.resolveMandatoryEndpoint(uri);
+        return assertIsInstanceOf(JournalEndpoint.class, endpoint);
+    }
+}
\ No newline at end of file

Propchange: activemq/camel/trunk/components/camel-activemq/src/test/java/org/apache/camel/component/activemq/JournalConfigureTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/components/camel-activemq/src/test/java/org/apache/camel/component/activemq/JournalConfigureTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date