You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/03/13 09:02:46 UTC

[camel] branch camel-2.x updated: [CAMEL-13263] Lenient IPFS connection check on startup

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch camel-2.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.x by this push:
     new e45d49c  [CAMEL-13263] Lenient IPFS connection check on startup
e45d49c is described below

commit e45d49c76d22f22ff945f04fe41d37d164fe962e
Author: Thomas Diesler <td...@redhat.com>
AuthorDate: Tue Feb 26 14:56:25 2019 +0100

    [CAMEL-13263] Lenient IPFS connection check on startup
---
 .../apache/camel/component/ipfs/IPFSComponent.java | 16 -------
 .../apache/camel/component/ipfs/IPFSEndpoint.java  | 53 ++++++++++++++++----
 .../camel/component/ipfs/SimpleIPFSTest.java       | 56 ++++++++--------------
 3 files changed, 64 insertions(+), 61 deletions(-)

diff --git a/components/camel-ipfs/src/main/java/org/apache/camel/component/ipfs/IPFSComponent.java b/components/camel-ipfs/src/main/java/org/apache/camel/component/ipfs/IPFSComponent.java
index e8a260b..37a087b 100644
--- a/components/camel-ipfs/src/main/java/org/apache/camel/component/ipfs/IPFSComponent.java
+++ b/components/camel-ipfs/src/main/java/org/apache/camel/component/ipfs/IPFSComponent.java
@@ -19,16 +19,11 @@ package org.apache.camel.component.ipfs;
 import java.net.URI;
 import java.util.Map;
 
-import io.nessus.ipfs.client.DefaultIPFSClient;
-import io.nessus.ipfs.client.IPFSClient;
-
 import org.apache.camel.Endpoint;
 import org.apache.camel.impl.DefaultComponent;
 
 public class IPFSComponent extends DefaultComponent {
 
-    private IPFSClient client;
-
     @Override
     protected Endpoint createEndpoint(String urispec, String remaining, Map<String, Object> params) throws Exception {
 
@@ -53,17 +48,6 @@ public class IPFSComponent extends DefaultComponent {
         }
         config.setIpfsCmd(cmd);
 
-        client = createClient(config);
-
         return new IPFSEndpoint(urispec, this, config);
     }
-
-    public IPFSClient getIPFSClient() {
-        return client;
-    }
-
-    private synchronized IPFSClient createClient(IPFSConfiguration config) {
-        IPFSClient ipfsClient = new DefaultIPFSClient(config.getIpfsHost(), config.getIpfsPort());
-        return ipfsClient.connect();
-    }
 }
diff --git a/components/camel-ipfs/src/main/java/org/apache/camel/component/ipfs/IPFSEndpoint.java b/components/camel-ipfs/src/main/java/org/apache/camel/component/ipfs/IPFSEndpoint.java
index 5f14518..3abbf50 100644
--- a/components/camel-ipfs/src/main/java/org/apache/camel/component/ipfs/IPFSEndpoint.java
+++ b/components/camel-ipfs/src/main/java/org/apache/camel/component/ipfs/IPFSEndpoint.java
@@ -27,7 +27,9 @@ import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import io.ipfs.multihash.Multihash;
+import io.nessus.ipfs.client.DefaultIPFSClient;
 import io.nessus.ipfs.client.IPFSClient;
+import io.nessus.ipfs.client.IPFSException;
 
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
@@ -36,6 +38,8 @@ import org.apache.camel.component.ipfs.IPFSConfiguration.IPFSCommand;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The camel-ipfs component provides access to the Interplanetary File System
@@ -44,16 +48,39 @@ import org.apache.camel.spi.UriParam;
 @UriEndpoint(firstVersion = "2.23.0", scheme = "ipfs", title = "IPFS", syntax = "ipfs:host:port/cmd", producerOnly = true, label = "file,ipfs")
 public class IPFSEndpoint extends DefaultEndpoint {
 
-    static long defaultTimeout = 10000L;
+    public static final long DEFAULT_TIMEOUT = 10000L;
+    
+    private static final Logger LOG = LoggerFactory.getLogger(IPFSComponent.class);
     
     @UriParam
-    private final IPFSConfiguration configuration;
+    private final IPFSConfiguration config;
+
+    private IPFSClient client;
 
-    public IPFSEndpoint(String uri, IPFSComponent component, IPFSConfiguration configuration) {
+    public IPFSEndpoint(String uri, IPFSComponent component, IPFSConfiguration config) {
         super(uri, component);
-        this.configuration = configuration;
+        this.config = config;
+        this.client = createClient(config);
+    }
+
+    public IPFSClient getIPFSClient() {
+        return client;
     }
 
+    public void setClient(IPFSClient client) {
+        this.client = client;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        try {
+            client.connect();
+        } catch (IPFSException ex) {
+            LOG.warn(ex.getMessage());
+        }
+    }
+    
     @Override
     public IPFSComponent getComponent() {
         return (IPFSComponent)super.getComponent();
@@ -75,11 +102,11 @@ public class IPFSEndpoint extends DefaultEndpoint {
     }
 
     IPFSConfiguration getConfiguration() {
-        return configuration;
+        return config;
     }
 
     IPFSCommand getCommand() {
-        String cmd = configuration.getIpfsCmd();
+        String cmd = config.getIpfsCmd();
         try {
             return IPFSCommand.valueOf(cmd);
         } catch (IllegalArgumentException ex) {
@@ -100,7 +127,7 @@ public class IPFSEndpoint extends DefaultEndpoint {
         Multihash mhash = Multihash.fromBase58(cid);
         Future<InputStream> future = ipfs().cat(mhash);
         try {
-            return future.get(defaultTimeout, TimeUnit.MILLISECONDS);
+            return future.get(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException ex) {
             throw new IOException("Cannot obtain: " + cid, ex);
         }
@@ -110,13 +137,21 @@ public class IPFSEndpoint extends DefaultEndpoint {
         Multihash mhash = Multihash.fromBase58(cid);
         Future<Path> future = ipfs().get(mhash, outdir);
         try {
-            return future.get(defaultTimeout, TimeUnit.MILLISECONDS);
+            return future.get(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
         } catch (InterruptedException | ExecutionException ex) {
             throw new IOException("Cannot obtain: " + cid, ex);
         }
     }
 
     private IPFSClient ipfs() {
-        return getComponent().getIPFSClient();
+        if (!client.hasConnection()) {
+            client.connect();
+        }
+        return client;
+    }
+    
+    private IPFSClient createClient(IPFSConfiguration config) {
+        IPFSClient ipfsClient = new DefaultIPFSClient(config.getIpfsHost(), config.getIpfsPort());
+        return ipfsClient;
     }
 }
diff --git a/components/camel-ipfs/src/test/java/org/apache/camel/component/ipfs/SimpleIPFSTest.java b/components/camel-ipfs/src/test/java/org/apache/camel/component/ipfs/SimpleIPFSTest.java
index 24dc252..82c3fa0 100644
--- a/components/camel-ipfs/src/test/java/org/apache/camel/component/ipfs/SimpleIPFSTest.java
+++ b/components/camel-ipfs/src/test/java/org/apache/camel/component/ipfs/SimpleIPFSTest.java
@@ -25,9 +25,6 @@ import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.List;
 
-import io.nessus.ipfs.client.DefaultIPFSClient;
-import io.nessus.ipfs.client.IPFSClient;
-import io.nessus.ipfs.client.IPFSException;
 import io.nessus.utils.StreamUtils;
 
 import org.apache.camel.CamelContext;
@@ -36,24 +33,10 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.junit.Assert;
 import org.junit.Assume;
-import org.junit.Before;
 import org.junit.Test;
 
 public class SimpleIPFSTest {
 
-    IPFSClient ipfs;
-    
-    @Before
-    public void before() {
-        ipfs = new DefaultIPFSClient("127.0.0.1", 5001);
-        try {
-            ipfs.connect();
-        } catch (IPFSException ex) {
-            // ignore
-        }
-        Assume.assumeTrue(ipfs.hasConnection());
-    }
-    
     @Test
     public void ipfsVersion() throws Exception {
 
@@ -68,8 +51,8 @@ public class SimpleIPFSTest {
         });
 
         camelctx.start();
-        assumeIPFS(camelctx);
-
+        assumeIPFSAvailable(camelctx);
+        
         try {
             ProducerTemplate producer = camelctx.createProducerTemplate();
             String resA = producer.requestBody("direct:startA", null, String.class);
@@ -96,12 +79,11 @@ public class SimpleIPFSTest {
             }
         });
 
-        Path path = Paths.get("src/test/resources/html/index.html");
-
         camelctx.start();
-        assumeIPFS(camelctx);
-
+        assumeIPFSAvailable(camelctx);
+        
         try {
+            Path path = Paths.get("src/test/resources/html/index.html");
             ProducerTemplate producer = camelctx.createProducerTemplate();
             String res = producer.requestBody("direct:start", path, String.class);
             Assert.assertEquals(hash, res);
@@ -124,12 +106,11 @@ public class SimpleIPFSTest {
             }
         });
 
-        Path path = Paths.get("src/test/resources/html");
-
         camelctx.start();
-        assumeIPFS(camelctx);
-
+        assumeIPFSAvailable(camelctx);
+        
         try {
+            Path path = Paths.get("src/test/resources/html");
             ProducerTemplate producer = camelctx.createProducerTemplate();
             List<String> res = producer.requestBody("direct:start", path, List.class);
             Assert.assertEquals(10, res.size());
@@ -153,8 +134,8 @@ public class SimpleIPFSTest {
         });
 
         camelctx.start();
-        assumeIPFS(camelctx);
-
+        assumeIPFSAvailable(camelctx);
+        
         try {
             ProducerTemplate producer = camelctx.createProducerTemplate();
             InputStream res = producer.requestBody("direct:start", hash, InputStream.class);
@@ -178,8 +159,8 @@ public class SimpleIPFSTest {
         });
 
         camelctx.start();
-        assumeIPFS(camelctx);
-
+        assumeIPFSAvailable(camelctx);
+        
         try {
             ProducerTemplate producer = camelctx.createProducerTemplate();
             Path res = producer.requestBody("direct:start", hash, Path.class);
@@ -204,8 +185,8 @@ public class SimpleIPFSTest {
         });
 
         camelctx.start();
-        assumeIPFS(camelctx);
-
+        assumeIPFSAvailable(camelctx);
+        
         try {
             ProducerTemplate producer = camelctx.createProducerTemplate();
             Path res = producer.requestBody("direct:start", hash, Path.class);
@@ -223,8 +204,11 @@ public class SimpleIPFSTest {
         Assert.assertEquals("The quick brown fox jumps over the lazy dog.", new String(baos.toByteArray()));
     }
 
-    private void assumeIPFS(CamelContext camelctx) {
-        IPFSComponent comp = camelctx.getComponent("ipfs", IPFSComponent.class);
-        Assume.assumeTrue(comp.getIPFSClient().hasConnection());
+    private void assumeIPFSAvailable(CamelContext camelctx) throws Exception {
+        IPFSEndpoint ipfsEp = camelctx.getEndpoints().stream()
+                .filter(ep -> ep instanceof IPFSEndpoint)
+                .map(ep -> (IPFSEndpoint)ep)
+                .findFirst().get();
+        Assume.assumeTrue(ipfsEp.getIPFSClient().hasConnection());
     }
 }