You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dj...@apache.org on 2016/03/10 00:59:12 UTC
[4/5] incubator-quarks git commit: Add tests for IotDevicePubSub
application and working setup
Add tests for IotDevicePubSub application and working setup
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/afa455ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/afa455ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/afa455ab
Branch: refs/heads/master
Commit: afa455aba142d5e9f08cb53824e8dbba318f0fd3
Parents: b412625
Author: Dan Debrunner <dj...@apache.org>
Authored: Tue Mar 8 10:12:24 2016 -0800
Committer: Daniel J. Debrunner <de...@us.ibm.com>
Committed: Wed Mar 9 15:36:02 2016 -0800
----------------------------------------------------------------------
apps/.classpath | 1 +
apps/.gitignore | 5 +
apps/iot/build.xml | 2 +-
.../java/quarks/apps/iot/IotDevicePubSub.java | 158 ++++++++-----------
.../java/quarks/apps/iot/ProxyIotDevice.java | 75 +++++----
.../quarks/test/apps/iot/EchoIotDevice.java | 128 +++++++++++++++
.../test/apps/iot/IotDevicePubSubTest.java | 88 +++++++++++
7 files changed, 328 insertions(+), 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/afa455ab/apps/.classpath
----------------------------------------------------------------------
diff --git a/apps/.classpath b/apps/.classpath
index a44dd95..5e6e513 100644
--- a/apps/.classpath
+++ b/apps/.classpath
@@ -1,6 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="iot/src/main/java"/>
+ <classpathentry kind="src" path="iot/src/test/java"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry combineaccessrules="false" kind="src" path="/ext"/>
<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/afa455ab/apps/.gitignore
----------------------------------------------------------------------
diff --git a/apps/.gitignore b/apps/.gitignore
new file mode 100644
index 0000000..948fe44
--- /dev/null
+++ b/apps/.gitignore
@@ -0,0 +1,5 @@
+/classes/
+**/test.classes/
+**/classes/
+**/unittests/
+/bin/
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/afa455ab/apps/iot/build.xml
----------------------------------------------------------------------
diff --git a/apps/iot/build.xml b/apps/iot/build.xml
index 0ce57d3..838531b 100644
--- a/apps/iot/build.xml
+++ b/apps/iot/build.xml
@@ -16,7 +16,7 @@
<path id="test.compile.classpath">
<pathelement location="${jar}" />
- <pathelement location="../../api/topology/test.classes" />
+ <pathelement location="${quarks.lib}/quarks.providers.direct.jar" />
<path refid="compile.classpath"/>
</path>
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/afa455ab/apps/iot/src/main/java/quarks/apps/iot/IotDevicePubSub.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/main/java/quarks/apps/iot/IotDevicePubSub.java b/apps/iot/src/main/java/quarks/apps/iot/IotDevicePubSub.java
index 481e74e..d0ddb47 100644
--- a/apps/iot/src/main/java/quarks/apps/iot/IotDevicePubSub.java
+++ b/apps/iot/src/main/java/quarks/apps/iot/IotDevicePubSub.java
@@ -30,82 +30,67 @@ import quarks.topology.TStream;
import quarks.topology.TopologyElement;
/**
- * Application sharing an {@code IotDevice}
- * through publish-subscribe.
- * <BR>
- * This application allows sharing an {@link IotDevice}
- * across multiple running jobs. This allows a single
- * connection to a back-end message hub to be shared across
- * multiple independent applications, without having to
- * build a single topology.
+ * Application sharing an {@code IotDevice} through publish-subscribe. <BR>
+ * This application allows sharing an {@link IotDevice} across multiple running
+ * jobs. This allows a single connection to a back-end message hub to be shared
+ * across multiple independent applications, without having to build a single
+ * topology.
* <P>
- * Applications coded to {@link IotDevice}
- * obtain a topology specific {@code IotDevice} using
- * {@link #addIotDevice(TopologyElement)}. This returned
- * device will route events and commands to/from the
- * actual message hub {@code IotDevice} through
- * publish-subscribe.
+ * Applications coded to {@link IotDevice} obtain a topology specific
+ * {@code IotDevice} using {@link #addIotDevice(TopologyElement)}. This returned
+ * device will route events and commands to/from the actual message hub
+ * {@code IotDevice} through publish-subscribe.
* <P>
- * An instance of this application is created by creating
- * a new topology and then creating a {@link IotDevice}
- * specific to the desired message hub. Then the application
- * is created by calling {@link #IotDevicePubSub(IotDevice)}
- * passing the {@code IotDevice}.
- * <BR>
- * Then additional independent applications (topologies) can be created
- * and they create a proxy {@code IotDevice} for their topology using
- * {@link #addIotDevice(TopologyElement)}. This proxy {@code IotDevice}
- * is then used to send device events and receive device commands in
- * that topology.
- * <BR>
+ * An instance of this application is created by creating a new topology and
+ * then creating a {@link IotDevice} specific to the desired message hub. Then
+ * the application is created by calling {@link #IotDevicePubSub(IotDevice)}
+ * passing the {@code IotDevice}. <BR>
+ * Then additional independent applications (topologies) can be created and they
+ * create a proxy {@code IotDevice} for their topology using
+ * {@link #addIotDevice(TopologyElement)}. This proxy {@code IotDevice} is then
+ * used to send device events and receive device commands in that topology. <BR>
* Once all the topologies have been declared they can be submitted.
* </P>
* <P>
* Limitations:
* <UL>
- * <LI>
- * Subscribing to all device commands (passing no arguments to {@link IotDevice#commands(String...)} is
- * not supported by the proxy {@code IotDevice}.
- * </LI>
- * <LI>
- * All applications that subscribe to device commands must be declared before
- * the instance of this application is submitted.
- * </LI>
+ * <LI>Subscribing to all device commands (passing no arguments to
+ * {@link IotDevice#commands(String...)} is not supported by the proxy
+ * {@code IotDevice}.</LI>
+ * <LI>All applications that subscribe to device commands must be declared
+ * before the instance of this application is submitted.</LI>
* </UL>
* </P>
*/
public class IotDevicePubSub {
-
- /**
- * Events published to topic {@value} are sent as device
- * events using the actual message hub {@link IotDevice}.
- * <BR>
- * it is recommended applications use the {@code IotDevice}
- * returned by {@link #addIotDevice(TopologyElement)} to
- * send events rather than publishing streams to this
- * topic.
- */
+
+ /**
+ * Events published to topic {@value} are sent as device events using the
+ * actual message hub {@link IotDevice}. <BR>
+ * it is recommended applications use the {@code IotDevice} returned by
+ * {@link #addIotDevice(TopologyElement)} to send events rather than
+ * publishing streams to this topic.
+ */
public static final String EVENTS = "quarks/iot/events";
-
- /**
- * Device commands are published to {@code quarks/iot/command/commandId}
- * by this application.
- * <BR>
- * it is recommended applications use the {@code IotDevice}
- * returned by {@link #addIotDevice(TopologyElement)} to
- * send events rather than subscribing to streams with this
- * topic prefix.
- */
+
+ /**
+ * Device commands are published to {@code quarks/iot/command/commandId} by
+ * this application. <BR>
+ * it is recommended applications use the {@code IotDevice} returned by
+ * {@link #addIotDevice(TopologyElement)} to send events rather than
+ * subscribing to streams with this topic prefix.
+ */
public static final String COMMANDS_PREFIX = "quarks/iot/command/";
-
private final IotDevice device;
private final Set<String> publishedCommandTopics = new HashSet<>();
-
+
/**
- * Create an instance of this application using {@code device}
- * as the device connection to a message hub.
- * @param device Device to a message hub.
+ * Create an instance of this application using {@code device} as the device
+ * connection to a message hub.
+ *
+ * @param device
+ * Device to a message hub.
*/
public IotDevicePubSub(IotDevice device) {
this.device = device;
@@ -113,60 +98,53 @@ public class IotDevicePubSub {
}
/**
- * Add a proxy {@code IotDevice} for the topology
- * containing {@code te}.
+ * Add a proxy {@code IotDevice} for the topology containing {@code te}.
* <P>
- * Any events sent through the returned device
- * are sent onto the message hub device through
- * publish-subscribe.
- * <BR>
- * Subscribing to commands using the returned
- * device will subscribe to commands received
- * by the message hub device.
+ * Any events sent through the returned device are sent onto the message hub
+ * device through publish-subscribe. <BR>
+ * Subscribing to commands using the returned device will subscribe to
+ * commands received by the message hub device.
* </P>
*
- * @param te Topology the returned device is contained in.
+ * @param te
+ * Topology the returned device is contained in.
* @return Proxy device.
*/
- public IotDevice addIotDevice(TopologyElement te) {
+ public IotDevice addIotDevice(TopologyElement te) {
return new ProxyIotDevice(this, te.topology());
}
-
+
/**
- * Create initial application subscribing to
- * events and delivering them to the message
- * hub IotDevice.
+ * Create initial application subscribing to events and delivering them to
+ * the message hub IotDevice.
*
*/
private void createApplication() {
-
- TStream<JsonObject> events = PublishSubscribe.subscribe(device,
- EVENTS, JsonObject.class);
-
- device.events(events,
- ew -> ew.getAsJsonPrimitive("eventId").getAsString(),
- ew -> ew.getAsJsonObject("event"),
+
+ TStream<JsonObject> events = PublishSubscribe.subscribe(device, EVENTS, JsonObject.class);
+
+ device.events(events, ew -> ew.getAsJsonPrimitive("eventId").getAsString(), ew -> ew.getAsJsonObject("event"),
ew -> ew.getAsJsonPrimitive("qos").getAsInt());
}
-
+
/**
- * Subscribe to device commands.
- * Keep track of which command identifiers are subscribed to
- * so the stream for a command identifier is only created once.
+ * Subscribe to device commands. Keep track of which command identifiers are
+ * subscribed to so the stream for a command identifier is only created
+ * once.
*
* @param commandIdentifier
- * @return Topic that needs to be subscribed to in the
- * subscriber application.
+ * @return Topic that needs to be subscribed to in the subscriber
+ * application.
*/
synchronized String subscribeToCommand(String commandIdentifier) {
-
+
String topic = COMMANDS_PREFIX + commandIdentifier;
-
+
if (!publishedCommandTopics.contains(topic)) {
TStream<JsonObject> systemStream = device.commands(commandIdentifier);
PublishSubscribe.publish(systemStream, topic, JsonObject.class);
}
-
+
return topic;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/afa455ab/apps/iot/src/main/java/quarks/apps/iot/ProxyIotDevice.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/main/java/quarks/apps/iot/ProxyIotDevice.java b/apps/iot/src/main/java/quarks/apps/iot/ProxyIotDevice.java
index 0c3b185..bfbf229 100644
--- a/apps/iot/src/main/java/quarks/apps/iot/ProxyIotDevice.java
+++ b/apps/iot/src/main/java/quarks/apps/iot/ProxyIotDevice.java
@@ -32,20 +32,22 @@ import quarks.topology.TStream;
import quarks.topology.Topology;
/**
- * Proxy IotDevice that uses publish-subscribe
- * and IotDevicePubSub application to communicate
- * with a single IotDevice connected to a message hub.
+ * Proxy IotDevice that uses publish-subscribe and IotDevicePubSub application
+ * to communicate with a single IotDevice connected to a message hub.
*/
class ProxyIotDevice implements IotDevice {
-
- private IotDevicePubSub app;
-
+
+ private IotDevicePubSub app;
+
private final Topology topology;
-
+
/**
* Create a proxy IotDevice
- * @param app IotDevicePubSub application hosting the actual IotDevice.
- * @param topology Topology of the subscribing application.
+ *
+ * @param app
+ * IotDevicePubSub application hosting the actual IotDevice.
+ * @param topology
+ * Topology of the subscribing application.
*/
ProxyIotDevice(IotDevicePubSub app, Topology topology) {
this.app = app;
@@ -58,60 +60,57 @@ class ProxyIotDevice implements IotDevice {
}
/**
- * Publishes events derived from {@code stream} using the
- * topic {@link IotDevicePubSub#EVENTS} as a JsonObject
- * containing eventId, event, and qos keys.
+ * Publishes events derived from {@code stream} using the topic
+ * {@link IotDevicePubSub#EVENTS} as a JsonObject containing eventId, event,
+ * and qos keys.
*/
@Override
public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
-
- stream = stream.map(event ->
- {
- JsonObject publishedEvent = new JsonObject();
-
- publishedEvent.addProperty("eventId", eventId.apply(event));
- publishedEvent.add("event", payload.apply(event));
- publishedEvent.addProperty("qos", qos.apply(event));
-
- return publishedEvent;
- });
-
+
+ stream = stream.map(event -> {
+ JsonObject publishedEvent = new JsonObject();
+
+ publishedEvent.addProperty("eventId", eventId.apply(event));
+ publishedEvent.add("event", payload.apply(event));
+ publishedEvent.addProperty("qos", qos.apply(event));
+
+ return publishedEvent;
+ });
+
return PublishSubscribe.publish(stream, IotDevicePubSub.EVENTS, JsonObject.class);
}
/**
- * Publishes events derived from {@code stream} using the
- * topic {@link IotDevicePubSub#EVENTS} as a JsonObject
- * containing eventId, event, and qos keys.
+ * Publishes events derived from {@code stream} using the topic
+ * {@link IotDevicePubSub#EVENTS} as a JsonObject containing eventId, event,
+ * and qos keys.
*/
@Override
public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
- stream = stream.map(event ->
- {
+ stream = stream.map(event -> {
JsonObject publishedEvent = new JsonObject();
-
+
publishedEvent.addProperty("eventId", eventId);
publishedEvent.add("event", event);
publishedEvent.addProperty("qos", qos);
-
+
return publishedEvent;
});
-
+
return PublishSubscribe.publish(stream, IotDevicePubSub.EVENTS, JsonObject.class);
}
-
+
/**
- * Subscribes to commands.
- * Doesn't yet support subscribing to all commands.
+ * Subscribes to commands. Doesn't yet support subscribing to all commands.
*/
@Override
public TStream<JsonObject> commands(String... commandIdentifiers) {
-
+
final String firstTopic = app.subscribeToCommand(commandIdentifiers[0]);
TStream<JsonObject> commandsStream = PublishSubscribe.subscribe(this, firstTopic, JsonObject.class);
-
+
if (commandIdentifiers.length > 1) {
Set<TStream<JsonObject>> additionalStreams = new HashSet<>();
for (int i = 1; i < commandIdentifiers.length; i++) {
@@ -120,7 +119,7 @@ class ProxyIotDevice implements IotDevice {
}
commandsStream = commandsStream.union(additionalStreams);
}
-
+
return commandsStream;
}
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/afa455ab/apps/iot/src/test/java/quarks/test/apps/iot/EchoIotDevice.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/test/java/quarks/test/apps/iot/EchoIotDevice.java b/apps/iot/src/test/java/quarks/test/apps/iot/EchoIotDevice.java
new file mode 100644
index 0000000..fc91668
--- /dev/null
+++ b/apps/iot/src/test/java/quarks/test/apps/iot/EchoIotDevice.java
@@ -0,0 +1,128 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package quarks.test.apps.iot;
+
+import static quarks.function.Functions.alwaysTrue;
+import static quarks.function.Functions.discard;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.gson.JsonObject;
+
+import quarks.connectors.iot.IotDevice;
+import quarks.function.Function;
+import quarks.function.UnaryOperator;
+import quarks.topology.TSink;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+import quarks.topology.plumbing.PlumbingStreams;
+
+/**
+ * A test IotDevice that echos back every event as a command with command
+ * identifier equal to the {@code cmdId} value in the event payload. If {@code cmdId}
+ * is not set then {@code ec_eventId} is used.
+ *
+ */
+public class EchoIotDevice implements IotDevice {
+
+ public static final String EVENT_CMD_ID = "cmdId";
+
+ private final Topology topology;
+ private TStream<JsonObject> echoCmds;
+
+ public EchoIotDevice(Topology topology) {
+ this.topology = topology;
+ }
+
+ @Override
+ public Topology topology() {
+ return topology;
+ }
+
+ @Override
+ public TSink<JsonObject> events(TStream<JsonObject> stream, Function<JsonObject, String> eventId,
+ UnaryOperator<JsonObject> payload, Function<JsonObject, Integer> qos) {
+
+ stream = stream.map(e -> {
+ JsonObject c = new JsonObject();
+ JsonObject evPayload = payload.apply(e);
+ c.addProperty(CMD_ID, getCommandIdFromEvent(eventId.apply(e), evPayload));
+ c.add(CMD_PAYLOAD, evPayload);
+ c.addProperty(CMD_FORMAT, "json");
+ c.addProperty(CMD_TS, System.currentTimeMillis());
+ return c;
+ });
+
+ return handleEvents(stream);
+ }
+
+ private static String getCommandIdFromEvent(String eventId, JsonObject evPayload) {
+ if (evPayload.has(EVENT_CMD_ID))
+ return evPayload.getAsJsonPrimitive(EVENT_CMD_ID).getAsString();
+ else
+ return "ec_" + eventId;
+ }
+
+ @Override
+ public TSink<JsonObject> events(TStream<JsonObject> stream, String eventId, int qos) {
+
+ stream = stream.map(e -> {
+ JsonObject c = new JsonObject();
+ c.addProperty(CMD_ID, getCommandIdFromEvent(eventId, e));
+ c.add(CMD_PAYLOAD, e);
+ c.addProperty(CMD_FORMAT, "json");
+ c.addProperty(CMD_TS, System.currentTimeMillis());
+ return c;
+ });
+
+ return handleEvents(stream);
+ }
+
+ private TSink<JsonObject> handleEvents(TStream<JsonObject> stream) {
+
+ if (echoCmds == null)
+ echoCmds = PlumbingStreams.isolate(stream, true);
+ else
+ echoCmds = PlumbingStreams.isolate(stream.union(echoCmds), true);
+
+ return stream.sink(discard());
+ }
+
+ @Override
+ public TStream<JsonObject> commands(String... commands) {
+ if (commands.length == 0)
+ return echoCmds.filter(alwaysTrue());
+
+ TStream<JsonObject> cmd0 = echoCmds
+ .filter(j -> j.getAsJsonPrimitive(CMD_ID).getAsString().equals(commands[0]));
+
+ if (commands.length == 1)
+ return cmd0;
+
+ Set<TStream<JsonObject>> cmds = new HashSet<>();
+ for (int i = 1; i < commands.length; i++) {
+ final int idx = i;
+ cmds.add(echoCmds.filter(j -> j.getAsJsonPrimitive(CMD_ID).getAsString().equals(commands[idx])));
+ }
+
+ return cmd0.union(cmds);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/afa455ab/apps/iot/src/test/java/quarks/test/apps/iot/IotDevicePubSubTest.java
----------------------------------------------------------------------
diff --git a/apps/iot/src/test/java/quarks/test/apps/iot/IotDevicePubSubTest.java b/apps/iot/src/test/java/quarks/test/apps/iot/IotDevicePubSubTest.java
new file mode 100644
index 0000000..38555c4
--- /dev/null
+++ b/apps/iot/src/test/java/quarks/test/apps/iot/IotDevicePubSubTest.java
@@ -0,0 +1,88 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package quarks.test.apps.iot;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import com.google.gson.JsonObject;
+
+import quarks.apps.iot.IotDevicePubSub;
+import quarks.connectors.iot.IotDevice;
+import quarks.connectors.iot.QoS;
+import quarks.connectors.pubsub.service.ProviderPubSub;
+import quarks.connectors.pubsub.service.PublishSubscribeService;
+import quarks.execution.Job;
+import quarks.execution.Job.Action;
+import quarks.providers.direct.DirectProvider;
+import quarks.topology.TStream;
+import quarks.topology.Topology;
+import quarks.topology.plumbing.PlumbingStreams;
+import quarks.topology.tester.Condition;
+
+public class IotDevicePubSubTest {
+
+
+
+ @Test
+ public void testIotDevicePubSubApp() throws Exception {
+ DirectProvider dp = new DirectProvider();
+
+ dp.getServices().addService(PublishSubscribeService.class, new ProviderPubSub());
+
+ Topology iot = dp.newTopology("IotPubSub");
+ IotDevicePubSub iotApp = new IotDevicePubSub(new EchoIotDevice(iot));
+
+ Topology app1 = dp.newTopology("App1");
+
+ IotDevice app1Iot = iotApp.addIotDevice(app1);
+
+ TStream<String> data = app1.strings("A", "B", "C");
+
+ // Without this the tuple can be published and discarded before the
+ // subscriber is hooked up.
+ data = PlumbingStreams.blockingOneShotDelay(data, 500, TimeUnit.MILLISECONDS);
+
+ TStream<JsonObject> events = data.map(
+ s -> {JsonObject j = new JsonObject(); j.addProperty("v", s); return j;});
+ app1Iot.events(events, "ps1", QoS.FIRE_AND_FORGET);
+
+ TStream<JsonObject> echoedCmds = app1Iot.commands("ec_ps1");
+
+ TStream<String> ecs = echoedCmds.map(j -> j.getAsJsonObject(IotDevice.CMD_PAYLOAD).getAsJsonPrimitive("v").getAsString());
+ Condition<List<String>> tcEcho = app1.getTester().streamContents(ecs, "A", "B", "C"); // Expect all tuples
+
+ Job jIot = dp.submit(iot.topology()).get();
+ Job jApp = dp.submit(app1.topology()).get();
+
+ for (int i = 0; i < 50 && !tcEcho.valid(); i++) {
+ Thread.sleep(50);
+ }
+
+ assertTrue(tcEcho.getResult().toString(), tcEcho.valid());
+
+ jIot.stateChange(Action.CLOSE);
+ jApp.stateChange(Action.CLOSE);
+ }
+}