You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@plc4x.apache.org by sr...@apache.org on 2017/12/28 12:03:36 UTC
[incubator-plc4x] 01/02: Cleanup camel component: - Fixed shutdown
- Async call in test - Added path variable for driver
This is an automated email from the ASF dual-hosted git repository.
sruehl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit a667e0ec643a720a3fc010fdb991b2e89a94d6f4
Author: Sebastian Rühl <se...@codecentric.de>
AuthorDate: Thu Dec 21 11:49:27 2017 +0100
Cleanup camel component:
- Fixed shutdown
- Async call in test
- Added path variable for driver
---
.../java/org/apache/plc4x/camel/PLC4XEndpoint.java | 9 ++++
.../java/org/apache/plc4x/camel/PLC4XProducer.java | 57 +++++++++++-----------
.../org/apache/plc4x/camel/PLC4XComponentTest.java | 6 ++-
3 files changed, 41 insertions(+), 31 deletions(-)
diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XEndpoint.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XEndpoint.java
index dd800e1..4fd84e4 100644
--- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XEndpoint.java
+++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XEndpoint.java
@@ -32,10 +32,19 @@ import org.apache.plc4x.java.PlcDriverManager;
public class PLC4XEndpoint extends DefaultEndpoint {
/**
+ * The name 0f the PLC4X driver
+ */
+ @UriPath
+ @Metadata(required = "true")
+ @SuppressWarnings("unused")
+ String driver;
+
+ /**
* The address for the PLC4X driver
*/
@UriPath
@Metadata(required = "true")
+ @SuppressWarnings("unused")
String address;
final PlcDriverManager plcDriverManager;
diff --git a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XProducer.java b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XProducer.java
index 29331d2..e11d1e5 100644
--- a/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XProducer.java
+++ b/integrations/apache-camel/src/main/java/org/apache/plc4x/camel/PLC4XProducer.java
@@ -21,22 +21,22 @@ package org.apache.plc4x.camel;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
-import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.impl.DefaultAsyncProducer;
-import org.apache.camel.spi.ShutdownAware;
import org.apache.plc4x.java.api.connection.PlcConnection;
import org.apache.plc4x.java.api.connection.PlcWriter;
import org.apache.plc4x.java.api.exceptions.PlcException;
import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
import org.apache.plc4x.java.api.model.Address;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class PLC4XProducer extends DefaultAsyncProducer implements ShutdownAware {
- private static final Logger LOG = LoggerFactory.getLogger(PLC4XProducer.class);
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+public class PLC4XProducer extends DefaultAsyncProducer {
+ @SuppressWarnings("unused")
private PLC4XEndpoint endpoint;
private PlcConnection plcConnection;
+ private AtomicInteger openRequests;
public PLC4XProducer(PLC4XEndpoint endpoint) {
super(endpoint);
@@ -47,6 +47,7 @@ public class PLC4XProducer extends DefaultAsyncProducer implements ShutdownAware
} catch (PlcException e) {
throw new RuntimeException(e);
}
+ openRequests = new AtomicInteger();
}
@SuppressWarnings("unchecked")
@@ -57,13 +58,21 @@ public class PLC4XProducer extends DefaultAsyncProducer implements ShutdownAware
Object value = in.getBody(Object.class);
PlcWriteRequest plcSimpleWriteRequest = new PlcWriteRequest(datatype, address, value);
PlcWriter plcWriter = plcConnection.getWriter().orElseThrow(() -> new IllegalArgumentException("Writer for driver not found"));
- Object response = plcWriter.write(plcSimpleWriteRequest).get();
- if (exchange.getPattern().isOutCapable()) {
- Message out = exchange.getOut();
- out.copyFrom(exchange.getIn());
- out.setBody(response);
- } else {
- in.setBody(response);
+ CompletableFuture<PlcWriteResponse> completableFuture = plcWriter.write(plcSimpleWriteRequest);
+ int currentlyOpenRequests = openRequests.incrementAndGet();
+ try {
+ log.debug("Currently open requests including {}:{}", exchange, currentlyOpenRequests);
+ PlcWriteResponse plcWriteResponse = completableFuture.get();
+ if (exchange.getPattern().isOutCapable()) {
+ Message out = exchange.getOut();
+ out.copyFrom(exchange.getIn());
+ out.setBody(plcWriteResponse);
+ } else {
+ in.setBody(plcWriteResponse);
+ }
+ } finally {
+ int openRequestsAfterFinish = openRequests.decrementAndGet();
+ log.trace("Open Requests after {}:{}", exchange, openRequestsAfterFinish);
}
}
@@ -81,27 +90,17 @@ public class PLC4XProducer extends DefaultAsyncProducer implements ShutdownAware
}
@Override
- public boolean deferShutdown(ShutdownRunningTask shutdownRunningTask) {
- switch (shutdownRunningTask) {
- case CompleteCurrentTaskOnly:
- break;
- case CompleteAllTasks:
- break;
+ protected void doStop() throws Exception {
+ int openRequestsAtStop = openRequests.get();
+ log.debug("Stopping with {} open requests", openRequestsAtStop);
+ if (openRequestsAtStop > 0) {
+ log.warn("There are still {} open requests", openRequestsAtStop);
}
try {
plcConnection.close();
} catch (Exception ignore) {
}
- return false;
+ super.doStop();
}
- @Override
- public int getPendingExchangesSize() {
- return 0;
- }
-
- @Override
- public void prepareShutdown(boolean suspendOnly, boolean forced) {
-
- }
}
diff --git a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XComponentTest.java b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XComponentTest.java
index cc3ae5f..0e3e8ea 100644
--- a/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XComponentTest.java
+++ b/integrations/apache-camel/src/test/java/org/apache/plc4x/camel/PLC4XComponentTest.java
@@ -25,6 +25,8 @@ import org.apache.plc4x.java.s7.model.S7Address;
import org.apache.plc4x.java.s7.netty.model.types.MemoryArea;
import org.junit.Test;
+import java.util.concurrent.TimeUnit;
+
public class PLC4XComponentTest extends CamelTestSupport {
@Test
@@ -32,9 +34,9 @@ public class PLC4XComponentTest extends CamelTestSupport {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMinimumMessageCount(1);
- template.sendBody("direct:plc4x", 3);
+ template.asyncSendBody("direct:plc4x", "irrelevant");
- assertMockEndpointsSatisfied();
+ assertMockEndpointsSatisfied(2, TimeUnit.SECONDS);
}
@Override
--
To stop receiving notification emails like this one, please contact
"commits@plc4x.apache.org" <co...@plc4x.apache.org>.