You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by GitBox <gi...@apache.org> on 2022/10/05 20:04:06 UTC

[GitHub] [camel] davsclaus commented on a diff in pull request #8452: CAMEL-10173: camel-etcd3 - Add an implementation based on jetcd

davsclaus commented on code in PR #8452:
URL: https://github.com/apache/camel/pull/8452#discussion_r985757732


##########
components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/Etcd3Producer.java:
##########
@@ -47,73 +78,151 @@ class Etcd3Producer extends DefaultProducer {
     }
 
     @Override
-    public void process(Exchange exchange) throws Exception {
-        String action = exchange.getIn().getHeader(Etcd3Constants.ETCD_ACTION, String.class);
-        String targetPath = exchange.getIn().getHeader(Etcd3Constants.ETCD_PATH, String.class);
-        if (targetPath == null) {
-            targetPath = path;
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            String action = exchange.getIn().getHeader(Etcd3Constants.ETCD_ACTION, String.class);
+            String targetPath = exchange.getIn().getHeader(Etcd3Constants.ETCD_PATH, String.class);
+            if (targetPath == null) {
+                targetPath = path;
+            }
+
+            StringHelper.notEmpty(targetPath, Etcd3Constants.ETCD_PATH);
+            StringHelper.notEmpty(action, Etcd3Constants.ETCD_ACTION);
+
+            switch (action) {
+                case Etcd3Constants.ETCD_KEYS_ACTION_SET:
+                    processSetAsync(targetPath, exchange, callback);
+                    break;
+                case Etcd3Constants.ETCD_KEYS_ACTION_GET:
+                    processGetAsync(targetPath, exchange, callback);
+                    break;
+                case Etcd3Constants.ETCD_KEYS_ACTION_DELETE:
+                    processDelAsync(targetPath, exchange, callback);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown action " + action);
+            }
+        } catch (Exception e) {
+            exchange.setException(e);
+            callback.done(true);
+            return true;
         }
+        return false;
+    }
 
-        StringHelper.notEmpty(targetPath, Etcd3Constants.ETCD_PATH);
-        StringHelper.notEmpty(action, Etcd3Constants.ETCD_ACTION);
-
-        switch (action) {
-            case Etcd3Constants.ETCD_KEYS_ACTION_SET:
-                processSet(targetPath, exchange);
-                break;
-            case Etcd3Constants.ETCD_KEYS_ACTION_GET:
-                processGet(targetPath, exchange);
-                break;
-            case Etcd3Constants.ETCD_KEYS_ACTION_DELETE:
-                processDel(targetPath, exchange);
-                break;
-            default:
-                throw new IllegalArgumentException("Unknown action " + action);
+    @Override
+    protected void doStop() throws Exception {
+        try {
+            client.close();
+        } finally {
+            super.doStop();
         }
     }
 
-    private void processDel(String targetPath, Exchange exchange) throws Exception {
-        exchange.getIn().setBody(
+    /**
+     * Add actions to perform once the given future is complete.
+     *
+     * @param future   the future to complete with specific actions.
+     * @param exchange the exchange into which the result of the future (response or exception) is set.
+     * @param callback the callback to call once the future is done.
+     * @param <T>      the result type returned by the future.
+     */
+    private <T> void onComplete(CompletableFuture<T> future, Exchange exchange, AsyncCallback callback) {
+        future.thenAccept(r -> exchange.getIn().setBody(r))
+                .whenComplete(
+                        (r, e) -> {
+                            try {
+                                if (e != null) {
+                                    exchange.setException(new CamelException(e));

Review Comment:
   You can set an CamelExchangeException when you have access to the exchange causing this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@camel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org