You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ru...@apache.org on 2010/05/12 19:24:47 UTC
svn commit: r943590 - in
/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse:
config/xml/ mediators/eip/sample/
Author: ruwan
Date: Wed May 12 17:24:47 2010
New Revision: 943590
URL: http://svn.apache.org/viewvc?rev=943590&view=rev
Log:
Adding the sampling throttle mediator
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SamplingThrottleMediatorFactory.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SamplingThrottleMediatorSerializer.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/MessageQueue.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/SamplingThrottleMediator.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/UnboundedMessageQueue.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java?rev=943590&r1=943589&r2=943590&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorFactoryFinder.java Wed May 12 17:24:47 2010
@@ -75,7 +75,8 @@ public class MediatorFactoryFinder imple
EventPublisherMediatorFactory.class,
TransactionMediatorFactory.class,
EnqueueMediatorFactory.class,
- ConditionalRouterMediatorFactory.class
+ ConditionalRouterMediatorFactory.class,
+ SamplingThrottleMediatorFactory.class
};
private final static MediatorFactoryFinder instance = new MediatorFactoryFinder();
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java?rev=943590&r1=943589&r2=943590&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/MediatorSerializerFinder.java Wed May 12 17:24:47 2010
@@ -62,7 +62,8 @@ public class MediatorSerializerFinder {
EventPublisherMediatorSerializer.class,
TransactionMediatorSerializer.class,
EnqueueMediatorSerializer.class,
- ConditionalRouterMediatorSerializer.class
+ ConditionalRouterMediatorSerializer.class,
+ SamplingThrottleMediatorSerializer.class
};
private final static MediatorSerializerFinder instance = new MediatorSerializerFinder();
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SamplingThrottleMediatorFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SamplingThrottleMediatorFactory.java?rev=943590&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SamplingThrottleMediatorFactory.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SamplingThrottleMediatorFactory.java Wed May 12 17:24:47 2010
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.config.xml;
+
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.mediators.eip.Target;
+import org.apache.synapse.mediators.eip.sample.MessageQueue;
+import org.apache.synapse.mediators.eip.sample.SamplingThrottleMediator;
+
+import javax.xml.namespace.QName;
+
+/**
+ * Builds the {@link org.apache.synapse.mediators.eip.sample.SamplingThrottleMediator} instance by looking at the
+ * following configuration</p>
+ *
+ * <pre><sampler id="string" rate="int" unitTime="long">
+ * <messageQueue class="string"/>
+ * <target .../>
+ * <sampler/>
+ * </pre>
+ *
+ * @see org.apache.synapse.config.xml.AbstractMediatorFactory
+ */
+public class SamplingThrottleMediatorFactory extends AbstractMediatorFactory {
+
+ private static final QName SAMPLER_Q
+ = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "sampler");
+ private static final QName ID_ATTR
+ = new QName(XMLConfigConstants.NULL_NAMESPACE, "id");
+ private static final QName RATE_ATTR
+ = new QName(XMLConfigConstants.NULL_NAMESPACE, "rate");
+ private static final QName UNIT_TIME_ATTR
+ = new QName(XMLConfigConstants.NULL_NAMESPACE, "unitTime");
+ private static final QName MESSAGE_QUEUE_Q
+ = new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "messageQueue");
+ private static final QName CLASS_ATTR
+ = new QName(XMLConfigConstants.NULL_NAMESPACE, "class");
+
+ public Mediator createMediator(OMElement omElement) {
+
+ SamplingThrottleMediator samplingThrottleMediator = new SamplingThrottleMediator();
+ processAuditStatus(samplingThrottleMediator, omElement);
+
+ OMAttribute idAttribute = omElement.getAttribute(ID_ATTR);
+ if (idAttribute != null) {
+ samplingThrottleMediator.setId(idAttribute.getAttributeValue());
+ }
+
+ OMAttribute rateAttribute = omElement.getAttribute(RATE_ATTR);
+ if (rateAttribute != null) {
+ try {
+ samplingThrottleMediator.setSamplingRate(
+ Integer.parseInt(rateAttribute.getAttributeValue()));
+ } catch (NumberFormatException nfe) {
+ handleException("Sampling rate has to be an integer value, but found : "
+ + rateAttribute.getAttributeValue());
+ }
+ }
+
+ OMAttribute unitTimeAttribute = omElement.getAttribute(UNIT_TIME_ATTR);
+ if (unitTimeAttribute != null) {
+ try {
+ samplingThrottleMediator.setUnitTime(
+ Long.parseLong(unitTimeAttribute.getAttributeValue()));
+ } catch (NumberFormatException nfe) {
+ handleException("Sampling unitTime has to be a long value in milliseconds, " +
+ "but found : " + rateAttribute.getAttributeValue());
+ }
+ }
+
+ OMElement targetElem = omElement.getFirstChildWithName(TARGET_Q);
+ if (targetElem != null) {
+ Target target = TargetFactory.createTarget(targetElem);
+ samplingThrottleMediator.setTarget(target);
+ } else {
+ handleException("Sampler requires a target for the sampling mediation");
+ }
+
+ OMElement messageQueueElem = omElement.getFirstChildWithName(MESSAGE_QUEUE_Q);
+ if (messageQueueElem != null && messageQueueElem.getAttribute(CLASS_ATTR) != null) {
+ String className = messageQueueElem.getAttributeValue(CLASS_ATTR);
+ try {
+ Class messageQueueImplClass = Class.forName(className);
+ Object obj = messageQueueImplClass.newInstance();
+ if (obj instanceof MessageQueue) {
+ samplingThrottleMediator.setMessageQueue((MessageQueue) obj);
+ } else {
+ handleException("Provided message queue class : " + className
+ + " doesn't implement the org.apache.synapse.mediators." +
+ "eip.sample.MessageQueue interface");
+ }
+ } catch (ClassNotFoundException e) {
+ handleException("Couldn't find the class specified for the message queue " +
+ "implementation : " + className);
+ } catch (InstantiationException e) {
+ handleException("Couldn't instantiate the message queue : " + className);
+ } catch (IllegalAccessException e) {
+ handleException("Couldn't instantiate the message queue : " + className);
+ }
+ }
+
+ return samplingThrottleMediator;
+ }
+
+ public QName getTagQName() {
+ return SAMPLER_Q;
+ }
+}
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SamplingThrottleMediatorSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SamplingThrottleMediatorSerializer.java?rev=943590&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SamplingThrottleMediatorSerializer.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/SamplingThrottleMediatorSerializer.java Wed May 12 17:24:47 2010
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.config.xml;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.mediators.eip.sample.SamplingThrottleMediator;
+
+/**
+ * Serializes the {@link org.apache.synapse.mediators.eip.sample.SamplingThrottleMediator} instance
+ * into a XML configuration as follows</p>
+ *
+ * <pre><sampler id="string" rate="int"/></pre>
+ *
+ * @see org.apache.synapse.config.xml.AbstractMediatorSerializer
+ */
+public class SamplingThrottleMediatorSerializer extends AbstractMediatorSerializer {
+
+ public OMElement serializeMediator(OMElement omElement, Mediator mediator) {
+ OMElement samplerElem = fac.createOMElement("sampler", synNS);
+ saveTracingState(samplerElem, mediator);
+
+ SamplingThrottleMediator samplingThrottleMediator = (SamplingThrottleMediator) mediator;
+
+ if (samplingThrottleMediator.getId() != null) {
+ samplerElem.addAttribute("id", samplingThrottleMediator.getId(), nullNS);
+ }
+ samplerElem.addAttribute("rate",
+ Integer.toString(samplingThrottleMediator.getSamplingRate()), nullNS);
+ samplerElem.addAttribute("unitTime",
+ Long.toString(samplingThrottleMediator.getUnitTime()), nullNS);
+
+ if (samplingThrottleMediator.isMessageQueueExplicitlySet()) {
+ OMElement messageQueueElem = fac.createOMElement("messageQueue", synNS);
+ messageQueueElem.addAttribute("class",
+ samplingThrottleMediator.getMessageQueue().getClass().getName(), nullNS);
+ samplerElem.addChild(messageQueueElem);
+ }
+
+ if (samplingThrottleMediator.getTarget() != null) {
+ samplerElem.addChild(
+ TargetSerializer.serializeTarget(samplingThrottleMediator.getTarget()));
+ } else {
+ handleException("Couldn't find the target for the sampler. " +
+ "Target is mandatory for a sampler");
+ }
+
+ if (omElement != null) {
+ omElement.addChild(samplerElem);
+ }
+
+ return samplerElem;
+ }
+
+ public String getMediatorClassName() {
+ return SamplingThrottleMediator.class.getName();
+ }
+}
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/MessageQueue.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/MessageQueue.java?rev=943590&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/MessageQueue.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/MessageQueue.java Wed May 12 17:24:47 2010
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.mediators.eip.sample;
+
+import org.apache.synapse.MessageContext;
+
+/**
+ *
+ */
+public interface MessageQueue {
+
+ void add(MessageContext synCtx);
+
+ MessageContext get();
+
+ boolean isEmpty();
+
+ boolean isPersistent();
+
+ boolean persist();
+
+ void load();
+}
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/SamplingThrottleMediator.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/SamplingThrottleMediator.java?rev=943590&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/SamplingThrottleMediator.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/SamplingThrottleMediator.java Wed May 12 17:24:47 2010
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.mediators.eip.sample;
+
+import org.apache.axis2.Constants;
+import org.apache.axis2.context.OperationContext;
+import org.apache.synapse.ManagedLifecycle;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseLog;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.mediators.AbstractMediator;
+import org.apache.synapse.mediators.eip.Target;
+
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * This implements the well known <code>Sample</code> EIP (Enterprise Integration Pattern), which controls the flow
+ * of messages and limit the rate at which the messages are flowing through the sampler</p>
+ *
+ * <p>Please note that the usage of this will require the sampler to be on the out-flow as well to correctly
+ * determine & to manage the rate.</p>
+ *
+ * @see org.apache.synapse.mediators.AbstractMediator
+ */
+public class SamplingThrottleMediator extends AbstractMediator implements ManagedLifecycle {
+
+ /** Rate at which this mediator allows the flow in TPS */
+ private int samplingRate = 1;
+
+ /** Unit time in milliseconds applied to the <code>samplingRate</code> */
+ private long unitTime = 1000;
+
+ /** Identifier is used to co-relate the in and out path samplers */
+ private String id;
+
+ /** Target to be used for mediation from the sampler */
+ private Target target;
+
+ /**
+ * {@link org.apache.synapse.mediators.eip.sample.MessageQueue} implementation to be used,
+ * which defaults to {@link org.apache.synapse.mediators.eip.sample.UnboundedMessageQueue}
+ */
+ private MessageQueue messageQueue = new UnboundedMessageQueue();
+
+ private boolean messageQueueExplicitlySet;
+
+ private TimerTask messageProcessor;
+
+ public void init(SynapseEnvironment synapseEnvironment) {
+
+ if (messageQueue.isPersistent()) {
+ log.info("Loading the persisted messages if there are any to the message queue");
+ messageQueue.load();
+ }
+
+ Timer samplingTimer = synapseEnvironment.getSynapseConfiguration().getSynapseTimer();
+ messageProcessor = new MessageProcessor();
+ log.info("Scheduling the sampling timer to invoke the message processor " +
+ "at an interval of : " + unitTime);
+ samplingTimer.schedule(messageProcessor, 0, unitTime);
+ }
+
+ public void destroy() {
+ messageProcessor.cancel();
+ if (!messageQueue.isEmpty()) {
+ log.warn("There are messages on the sampling message queue, " +
+ "but the message processor has been destroyed.");
+ if (messageQueue.isPersistent()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Persisting the messages on the message queue");
+ }
+ if (messageQueue.persist()) {
+ log.info("Completed persisting the messages on the message queue");
+ } else {
+ log.error("Couldn't persist the messages on the message queue");
+ }
+ } else {
+ log.warn("You are not using a persistent message queue, " +
+ "you will be loosing messages which are on the queue");
+ }
+ }
+ }
+
+ public boolean mediate(MessageContext messageContext) {
+
+ SynapseLog synLog = getLog(messageContext);
+
+ synLog.traceOrDebug("Start : Sampler mediator");
+ if (synLog.isTraceTraceEnabled()) {
+ synLog.traceTrace("Message : " + messageContext.getEnvelope());
+ }
+
+ if (!messageContext.isResponse()) {
+ if (synLog.isTraceOrDebugEnabled()) {
+ synLog.traceOrDebug("Adding the message with message id : "
+ + messageContext.getMessageID() + " into the message queue for sampling");
+ }
+ messageQueue.add(messageContext);
+ } else {
+ synLog.auditWarn("Encountered a response message which will not be sampled");
+ }
+
+ OperationContext opCtx
+ = ((Axis2MessageContext) messageContext).getAxis2MessageContext().getOperationContext();
+ if (opCtx != null) {
+ opCtx.setProperty(Constants.RESPONSE_WRITTEN, "SKIP");
+ }
+
+ synLog.traceOrDebug("End : Sampler mediator");
+ return false;
+ }
+
+ public int getSamplingRate() {
+ return samplingRate;
+ }
+
+ public void setSamplingRate(int samplingRate) {
+ this.samplingRate = samplingRate;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public Target getTarget() {
+ return target;
+ }
+
+ public void setTarget(Target target) {
+ this.target = target;
+ }
+
+ public long getUnitTime() {
+ return unitTime;
+ }
+
+ public void setUnitTime(long unitTime) {
+ this.unitTime = unitTime;
+ }
+
+ public MessageQueue getMessageQueue() {
+ return messageQueue;
+ }
+
+ public void setMessageQueue(MessageQueue messageQueue) {
+ this.messageQueue = messageQueue;
+ this.messageQueueExplicitlySet = true;
+ }
+
+ public boolean isMessageQueueExplicitlySet() {
+ return messageQueueExplicitlySet;
+ }
+
+ private class MessageProcessor extends TimerTask {
+
+ @Override
+ public void run() {
+ if (log.isDebugEnabled()) {
+ log.debug("Started running the message processor");
+ }
+ for (int i = 0; i < samplingRate && !messageQueue.isEmpty(); i++) {
+ MessageContext synCtx = messageQueue.get();
+ if (log.isDebugEnabled()) {
+ log.debug("Mediating the message on the message queue with message id : "
+ + synCtx.getMessageID());
+ }
+ target.mediate(synCtx);
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Message processing completed for the given sampling rate");
+ }
+ }
+ }
+
+}
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/UnboundedMessageQueue.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/UnboundedMessageQueue.java?rev=943590&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/UnboundedMessageQueue.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/mediators/eip/sample/UnboundedMessageQueue.java Wed May 12 17:24:47 2010
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.mediators.eip.sample;
+
+import org.apache.synapse.MessageContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ */
+public class UnboundedMessageQueue implements MessageQueue {
+
+ private List<MessageContext> messageQueue = new ArrayList<MessageContext>();
+
+ public void add(MessageContext synCtx) {
+ messageQueue.add(synCtx);
+ }
+
+ public MessageContext get() {
+ if (!messageQueue.isEmpty()) {
+ return messageQueue.remove(0);
+ } else {
+ return null;
+ }
+ }
+
+ public boolean isEmpty() {
+ return messageQueue.isEmpty();
+ }
+
+ public boolean isPersistent() {
+ return false;
+ }
+
+ public boolean persist() {
+ return false;
+ }
+
+ public void load() {
+ throw new UnsupportedOperationException("Not Implemented");
+ }
+
+}