You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/03/24 12:12:48 UTC

[activemq-artemis] branch master updated: ARTEMIS-3204 Fixing NPE on Counting Queue for Resource Limit

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d24bfa  ARTEMIS-3204 Fixing NPE on Counting Queue for Resource Limit
8d24bfa is described below

commit 8d24bfa6464bde552fb71ec7a35bb8dd449c4084
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Tue Mar 23 13:27:59 2021 -0400

    ARTEMIS-3204 Fixing NPE on Counting Queue for Resource Limit
---
 .../artemis/core/server/ActiveMQMessageBundle.java |   2 +-
 .../core/server/impl/ActiveMQServerImpl.java       |  22 +-
 .../integration/server/ResourceLimitTest.java      |   7 +-
 tests/smoke-tests/pom.xml                          |  19 +-
 .../MaxQueueResourceTest/artemis-roles.properties  |  18 ++
 .../MaxQueueResourceTest/artemis-users.properties  |  21 ++
 .../servers/MaxQueueResourceTest/broker.xml        | 246 +++++++++++++++++++++
 .../smoke/resourcetest/MaxQueueResourceTest.java   |  87 ++++++++
 8 files changed, 409 insertions(+), 13 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 3ae3eb9..6a37515 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -363,7 +363,7 @@ public interface ActiveMQMessageBundle {
    ActiveMQSessionCreationException sessionLimitReached(String username, int limit);
 
    @Message(id = 229111, value = "Too many queues created by user ''{0}''. Queues allowed: {1}.", format = Message.Format.MESSAGE_FORMAT)
-   ActiveMQSessionCreationException queueLimitReached(String username, int limit);
+   ActiveMQSecurityException queueLimitReached(String username, int limit);
 
    @Message(id = 229112, value = "Cannot set MBeanServer during startup or while started")
    IllegalStateException cannotSetMBeanserver();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index cfc95f6..5f83122 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1744,16 +1744,22 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    }
 
    public int getQueueCountForUser(String username) throws Exception {
-      int queuesForUser = 0;
-
-      for (Binding binding : iterableOf(postOffice.getAllBindings())) {
-         if (binding instanceof LocalQueueBinding && ((LocalQueueBinding) binding).getQueue().getUser().equals(SimpleString.toSimpleString(username))) {
-            queuesForUser++;
+      SimpleString userNameSimpleString = SimpleString.toSimpleString(username);
+
+      AtomicInteger bindingsCount = new AtomicInteger(0);
+      postOffice.getAllBindings().forEach((b) -> {
+         if (b instanceof LocalQueueBinding) {
+            LocalQueueBinding l = (LocalQueueBinding) b;
+            SimpleString user = l.getQueue().getUser();
+            if (user != null) {
+               if (user.equals(userNameSimpleString)) {
+                  bindingsCount.incrementAndGet();
+               }
+            }
          }
-      }
-
-      return queuesForUser;
+      });
 
+      return bindingsCount.get();
    }
 
    protected ServerSessionImpl internalCreateSession(String name,
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ResourceLimitTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ResourceLimitTest.java
index e506fad..0c04129 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ResourceLimitTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ResourceLimitTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.server;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.ActiveMQSessionCreationException;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -104,7 +105,7 @@ public class ResourceLimitTest extends ActiveMQTestBase {
       try {
          clientSession.createQueue(new QueueConfiguration("anotherQueue").setAddress("address").setRoutingType(RoutingType.ANYCAST).setDurable(false));
       } catch (Exception e) {
-         assertTrue(e instanceof ActiveMQSessionCreationException);
+         assertTrue(e instanceof ActiveMQSecurityException);
       }
 
       clientSession.deleteQueue("queue");
@@ -114,13 +115,13 @@ public class ResourceLimitTest extends ActiveMQTestBase {
       try {
          clientSession.createQueue(new QueueConfiguration("anotherQueue").setAddress("address").setRoutingType(RoutingType.ANYCAST).setDurable(false));
       } catch (Exception e) {
-         assertTrue(e instanceof ActiveMQSessionCreationException);
+         assertTrue(e instanceof ActiveMQSecurityException);
       }
 
       try {
          clientSession.createSharedQueue(new QueueConfiguration("anotherQueue").setAddress("address").setDurable(false));
       } catch (Exception e) {
-         assertTrue(e instanceof ActiveMQSessionCreationException);
+         assertTrue(e instanceof ActiveMQSecurityException);
       }
    }
 }
diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml
index 3257936..5511fd1 100644
--- a/tests/smoke-tests/pom.xml
+++ b/tests/smoke-tests/pom.xml
@@ -739,7 +739,24 @@
                      <configuration>${basedir}/target/classes/servers/brokerConnectMirrorSecurityB</configuration>
                   </configuration>
                </execution>
-         </executions>
+
+               <!-- used on MaxQueueResourceTest  -->
+               <execution>
+                  <phase>test-compile</phase>
+                  <id>createBrokerMaxQueueResourceTest</id>
+                  <goals>
+                     <goal>create</goal>
+                  </goals>
+                  <configuration>
+                     <allowAnonymous>false</allowAnonymous>
+                     <user>A</user>
+                     <password>A</password>
+                     <noWeb>true</noWeb>
+                     <instance>${basedir}/target/MaxQueueResourceTest</instance>
+                     <configuration>${basedir}/target/classes/servers/MaxQueueResourceTest</configuration>
+                  </configuration>
+               </execution>
+            </executions>
          <dependencies>
                <dependency>
                   <groupId>org.apache.activemq.tests</groupId>
diff --git a/tests/smoke-tests/src/main/resources/servers/MaxQueueResourceTest/artemis-roles.properties b/tests/smoke-tests/src/main/resources/servers/MaxQueueResourceTest/artemis-roles.properties
new file mode 100644
index 0000000..1093896
--- /dev/null
+++ b/tests/smoke-tests/src/main/resources/servers/MaxQueueResourceTest/artemis-roles.properties
@@ -0,0 +1,18 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+amq=admin,johnopenwire,johnamqp,johncore
\ No newline at end of file
diff --git a/tests/smoke-tests/src/main/resources/servers/MaxQueueResourceTest/artemis-users.properties b/tests/smoke-tests/src/main/resources/servers/MaxQueueResourceTest/artemis-users.properties
new file mode 100644
index 0000000..96737e1
--- /dev/null
+++ b/tests/smoke-tests/src/main/resources/servers/MaxQueueResourceTest/artemis-users.properties
@@ -0,0 +1,21 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+johnamqp = doe
+johnopenwire = doe
+johncore = doe
+admin = admin
\ No newline at end of file
diff --git a/tests/smoke-tests/src/main/resources/servers/MaxQueueResourceTest/broker.xml b/tests/smoke-tests/src/main/resources/servers/MaxQueueResourceTest/broker.xml
new file mode 100644
index 0000000..10dfa94
--- /dev/null
+++ b/tests/smoke-tests/src/main/resources/servers/MaxQueueResourceTest/broker.xml
@@ -0,0 +1,246 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+               xmlns:xi="http://www.w3.org/2001/XInclude"
+               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="urn:activemq:core ">
+
+      <name>0.0.0.0</name>
+
+
+      <!-- no need for persistence on this test -->
+      <persistence-enabled>false</persistence-enabled>
+
+      <!-- this could be ASYNCIO, MAPPED, NIO
+           ASYNCIO: Linux Libaio
+           MAPPED: mmap files
+           NIO: Plain Java Files
+       -->
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>data/paging</paging-directory>
+
+      <bindings-directory>data/bindings</bindings-directory>
+
+      <journal-directory>data/journal</journal-directory>
+
+      <large-messages-directory>data/large-messages</large-messages-directory>
+
+      <journal-datasync>true</journal-datasync>
+
+      <journal-min-files>2</journal-min-files>
+
+      <journal-pool-files>10</journal-pool-files>
+
+      <journal-device-block-size>4096</journal-device-block-size>
+
+      <journal-file-size>10M</journal-file-size>
+      
+      <!--
+       This value was determined through a calculation.
+       Your system could perform 25 writes per millisecond
+       on the current journal configuration.
+       That translates as a sync write every 40000 nanoseconds.
+
+       Note: If you specify 0 the system will perform writes directly to the disk.
+             We recommend this to be 0 if you are using journalType=MAPPED and journal-datasync=false.
+      -->
+      <journal-buffer-timeout>40000</journal-buffer-timeout>
+
+
+      <!--
+        When using ASYNCIO, this will determine the writing queue depth for libaio.
+       -->
+      <journal-max-io>1</journal-max-io>
+      <!--
+        You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
+         <network-check-NIC>theNicName</network-check-NIC>
+        -->
+
+      <!--
+        Use this to use an HTTP server to validate the network
+         <network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+      <!-- <network-check-period>10000</network-check-period> -->
+      <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+      <!-- this is a comma separated list, no spaces, just DNS or IPs
+           it should accept IPV6
+
+           Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
+                    Using IPs that could eventually disappear or be partially visible may defeat the purpose.
+                    You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
+      <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+      <!-- use this to customize the ping used for ipv4 addresses -->
+      <!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->
+
+      <!-- use this to customize the ping used for ipv6 addresses -->
+      <!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->
+
+
+
+
+      <!-- how often we are looking for how many bytes are being used on the disk in ms -->
+      <disk-scan-period>5000</disk-scan-period>
+
+      <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
+           that won't support flow control. -->
+      <max-disk-usage>90</max-disk-usage>
+
+      <!-- should the broker detect dead locks and other issues -->
+      <critical-analyzer>true</critical-analyzer>
+
+      <critical-analyzer-timeout>120000</critical-analyzer-timeout>
+
+      <critical-analyzer-check-period>60000</critical-analyzer-check-period>
+
+      <critical-analyzer-policy>HALT</critical-analyzer-policy>
+
+      
+      <page-sync-timeout>40000</page-sync-timeout>
+
+
+      <acceptors>
+         <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
+      </acceptors>
+
+      <resource-limit-settings>
+         <resource-limit-setting match="johnamqp">
+            <max-queues>3</max-queues>
+         </resource-limit-setting>
+         <resource-limit-setting match="johnopenwire">
+            <!-- openwire has advisory queues, so we need extra ones -->
+            <max-queues>5</max-queues>
+         </resource-limit-setting>
+         <resource-limit-setting match="johncore">
+            <max-queues>3</max-queues>
+         </resource-limit-setting>
+      </resource-limit-settings>
+
+      <security-settings>
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="amq"/>
+            <permission type="deleteNonDurableQueue" roles="amq"/>
+            <permission type="createDurableQueue" roles="amq"/>
+            <permission type="deleteDurableQueue" roles="amq"/>
+            <permission type="createAddress" roles="amq"/>
+            <permission type="deleteAddress" roles="amq"/>
+            <permission type="consume" roles="amq"/>
+            <permission type="browse" roles="amq"/>
+            <permission type="send" roles="amq"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="amq"/>
+         </security-setting>
+         <security-setting match="myTopic">
+         <permission type="createNonDurableQueue" roles="amq"/>
+         <permission type="deleteNonDurableQueue" roles="amq"/>
+         <permission type="createDurableQueue" roles="amq"/>
+         <permission type="deleteDurableQueue" roles="amq"/>
+         <permission type="createAddress" roles="amq"/>
+         <permission type="deleteAddress" roles="amq"/>
+         <permission type="consume" roles="amq"/>
+         <permission type="browse" roles="amq"/>
+         <permission type="send" roles="amq"/>
+         <!-- we need this otherwise ./artemis data imp wouldn't work -->
+         <permission type="manage" roles="amq"/>
+      </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to be auto-create -->
+         <address-setting match="activemq.management#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            <message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+            <auto-create-jms-queues>true</auto-create-jms-queues>
+            <auto-create-jms-topics>true</auto-create-jms-topics>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            <message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+            <auto-create-jms-queues>true</auto-create-jms-queues>
+            <auto-create-jms-topics>true</auto-create-jms-topics>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ" />
+            </anycast>
+         </address>
+         <address name="ExpiryQueue">
+            <anycast>
+               <queue name="ExpiryQueue" />
+            </anycast>
+         </address>
+         <address name="toB">
+            <anycast>
+               <queue name="toB" />
+            </anycast>
+         </address>
+         <address name="toA">
+            <anycast>
+               <queue name="toA" />
+            </anycast>
+         </address>
+
+         <address name="myTopic">
+            <multicast/>
+         </address>
+
+      </addresses>
+
+
+      <!-- Uncomment the following if you want to use the Standard LoggingActiveMQServerPlugin pluging to log in events
+      <broker-plugins>
+         <broker-plugin class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
+            <property key="LOG_ALL_EVENTS" value="true"/>
+            <property key="LOG_CONNECTION_EVENTS" value="true"/>
+            <property key="LOG_SESSION_EVENTS" value="true"/>
+            <property key="LOG_CONSUMER_EVENTS" value="true"/>
+            <property key="LOG_DELIVERING_EVENTS" value="true"/>
+            <property key="LOG_SENDING_EVENTS" value="true"/>
+            <property key="LOG_INTERNAL_EVENTS" value="true"/>
+         </broker-plugin>
+      </broker-plugins>
+      -->
+
+   </core>
+</configuration>
diff --git a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/resourcetest/MaxQueueResourceTest.java b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/resourcetest/MaxQueueResourceTest.java
new file mode 100644
index 0000000..b732c03
--- /dev/null
+++ b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/resourcetest/MaxQueueResourceTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <br>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <br>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.resourcetest;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSSecurityException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MaxQueueResourceTest extends SmokeTestBase {
+
+   public static final String SERVER_NAME_A = "MaxQueueResourceTest";
+
+   @Before
+   public void before() throws Exception {
+      startServer(SERVER_NAME_A, 0, 0);
+      ServerUtil.waitForServerToStart(0, "admin", "admin", 30000);
+   }
+
+   @Test
+   public void testMaxQueue() throws Throwable {
+      // We call the three protocols in sequence here for two reasons:
+      // 1st: to actually test each protocol
+      // 2nd: Having more users creating stuff, makes the test more challenging (just in case)
+      //
+      // Notice that each protocol will concatenate the protocol name to the user and the clientID,
+      // which has been prepared by the server used on this test.
+      internalMaxQueue("core");
+      internalMaxQueue("openwire");
+      internalMaxQueue("amqp");
+   }
+
+   private void internalMaxQueue(String protocol) throws Throwable {
+      ConnectionFactory cfA = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
+
+
+      try (Connection connectionA = cfA.createConnection("john" + protocol, "doe")) {
+         connectionA.setClientID("c1" + protocol);
+         Session sessionA = connectionA.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic topic = sessionA.createTopic("myTopic");
+         MessageConsumer consumer1 = sessionA.createDurableSubscriber(topic, "t1");
+         MessageConsumer consumer2 = sessionA.createDurableSubscriber(topic, "t2");
+         MessageConsumer consumer3 = sessionA.createDurableSubscriber(topic, "t3");
+         Exception exception = null;
+         MessageConsumer consumer4 = null;
+
+         try {
+            consumer4 = sessionA.createDurableSubscriber(topic, "t4");
+         } catch (JMSSecurityException e) {
+            exception = e;
+         }
+         Assert.assertNull(consumer4);
+         Assert.assertNotNull(exception);
+         MessageProducer producerA = sessionA.createProducer(topic);
+         for (int i = 0; i < 10; i++) {
+            producerA.send(sessionA.createTextMessage("toB"));
+         }
+
+      }
+   }
+
+}