You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2012/09/13 15:52:24 UTC

svn commit: r1384336 - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/java/org/apache/hedwig/client/handlers/ hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig-server/src/main/java/org/apache/hedwig/server/common/

Author: sijie
Date: Thu Sep 13 13:52:24 2012
New Revision: 1384336

URL: http://svn.apache.org/viewvc?rev=1384336&view=rev
Log:
 BOOKKEEPER-392: Racey ConcurrentMap usage in java hedwig-client (Stu Hood via sijie)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1384336&r1=1384335&r2=1384336&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Sep 13 13:52:24 2012
@@ -96,6 +96,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-371: NPE in hedwig hub client causes hedwig hub to shut down. (Aniruddha via sijie)
 
+        BOOKKEEPER-392: Racey ConcurrentMap usage in java hedwig-client (Stu Hood via sijie)
+
       hedwig-server:
 
         BOOKKEEPER-302: No more messages delivered when hub server scans messages over two ledgers. (sijie via ivank)

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java?rev=1384336&r1=1384335&r2=1384336&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/SubscribeResponseHandler.java Thu Sep 13 13:52:24 2012
@@ -314,7 +314,7 @@ public class SubscribeResponseHandler {
         // outstanding message limit was reached. For now, only turn the
         // delivery back on if there are no more outstanding messages to
         // consume. We could make this a configurable parameter if needed.
-        if (!subscribeChannel.isReadable() && outstandingMsgSet.size() == 0) {
+        if (!subscribeChannel.isReadable() && outstandingMsgSet.isEmpty()) {
             if (logger.isDebugEnabled())
                 logger
                 .debug("Message consumption has caught up so okay to turn off throttling of messages on the subscribe channel for topic: "

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java?rev=1384336&r1=1384335&r2=1384336&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigClientImpl.java Thu Sep 13 13:52:24 2012
@@ -18,11 +18,10 @@
 package org.apache.hedwig.client.netty;
 
 import java.net.InetSocketAddress;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
@@ -76,7 +75,8 @@ public class HedwigClientImpl implements
     // also want to remove all topic mappings the host was responsible for.
     // The second Map is used as the inverted version of the first one.
     protected final ConcurrentMap<ByteString, InetSocketAddress> topic2Host = new ConcurrentHashMap<ByteString, InetSocketAddress>();
-    private final ConcurrentMap<InetSocketAddress, List<ByteString>> host2Topics = new ConcurrentHashMap<InetSocketAddress, List<ByteString>>();
+    private final ConcurrentMap<InetSocketAddress, ConcurrentLinkedQueue<ByteString>> host2Topics =
+        new ConcurrentHashMap<InetSocketAddress, ConcurrentLinkedQueue<ByteString>>();
 
     // Each client instantiation will have a Timer for running recurring
     // threads. One such timer task thread to is to timeout long running
@@ -292,23 +292,27 @@ public class HedwigClientImpl implements
         // server statuses, we consider that as a successful connection to the
         // correct topic master.
         InetSocketAddress host = getHostFromChannel(channel);
-        if (topic2Host.containsKey(pubSubData.topic) && topic2Host.get(pubSubData.topic).equals(host)) {
+        InetSocketAddress existingHost = topic2Host.get(pubSubData.topic);
+        if (existingHost != null && existingHost.equals(host)) {
             // Entry in map exists for the topic but it is the same as the
             // current host. In this case there is nothing to do.
             return;
         }
 
         // Store the relevant mappings for this topic and host combination.
-        if (logger.isDebugEnabled())
-            logger.debug("Storing info for topic: " + pubSubData.topic.toStringUtf8() + ", old host: "
-                         + topic2Host.get(pubSubData.topic) + ", new host: " + host);
-        topic2Host.put(pubSubData.topic, host);
-        if (host2Topics.containsKey(host)) {
-            host2Topics.get(host).add(pubSubData.topic);
-        } else {
-            LinkedList<ByteString> topicsList = new LinkedList<ByteString>();
-            topicsList.add(pubSubData.topic);
-            host2Topics.put(host, topicsList);
+        if (topic2Host.putIfAbsent(pubSubData.topic, host) == null) {
+            if (logger.isDebugEnabled())
+                logger.debug("Stored info for topic: " + pubSubData.topic.toStringUtf8() + ", old host: "
+                            + existingHost + ", new host: " + host);
+            ConcurrentLinkedQueue<ByteString> topicsForHost = host2Topics.get(host);
+            if (topicsForHost == null) {
+                ConcurrentLinkedQueue<ByteString> newTopicsList = new ConcurrentLinkedQueue<ByteString>();
+                topicsForHost = host2Topics.putIfAbsent(host, newTopicsList);
+                if (topicsForHost == null) {
+                  topicsForHost = newTopicsList;
+                }
+            }
+            topicsForHost.add(pubSubData.topic);
         }
     }
 
@@ -360,14 +364,15 @@ public class HedwigClientImpl implements
             logger.debug("Clearing all topics for host: " + host);
         // For each of the topics that the host was responsible for,
         // remove it from the topic2Host mapping.
-        if (host2Topics.containsKey(host)) {
-            for (ByteString topic : host2Topics.get(host)) {
+        ConcurrentLinkedQueue<ByteString> topicsForHost = host2Topics.get(host);
+        if (topicsForHost != null) {
+            for (ByteString topic : topicsForHost) {
                 if (logger.isDebugEnabled())
                     logger.debug("Removing mapping for topic: " + topic.toStringUtf8() + " from host: " + host);
-                topic2Host.remove(topic);
+                topic2Host.remove(topic, host);
             }
             // Now it is safe to remove the host2Topics mapping entry.
-            host2Topics.remove(host);
+            host2Topics.remove(host, topicsForHost);
         }
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java?rev=1384336&r1=1384335&r2=1384336&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigPublisher.java Thu Sep 13 13:52:24 2012
@@ -135,14 +135,15 @@ public class HedwigPublisher implements 
         // Check if we already have a Channel connection set up to the server
         // for the given Topic.
         PubSubData pubSubData = new PubSubData(topic, msg, null, OperationType.PUBLISH, null, callback, context);
-        if (client.topic2Host.containsKey(topic)) {
-            InetSocketAddress host = client.topic2Host.get(topic);
-            if (host2Channel.containsKey(host)) {
+        InetSocketAddress host = client.topic2Host.get(topic);
+        if (host != null) {
+            Channel channel = host2Channel.get(host);
+            if (channel != null) {
                 // We already have the Channel connection for the server host so
                 // do the publish directly. We will deal with redirect logic
                 // later on if that server is no longer the current host for
                 // the topic.
-                doPublish(pubSubData, host2Channel.get(host));
+                doPublish(pubSubData, channel);
             } else {
                 // We have a mapping for the topic to host but don't have a
                 // Channel for that server. This can happen if the Channel
@@ -155,10 +156,11 @@ public class HedwigPublisher implements 
             // default server host/port as defined in the configs. This should
             // point to the server VIP which would redirect to a random server
             // (which might not be the server hosting the topic).
-            InetSocketAddress host = cfg.getDefaultServerHost();
-            if (host2Channel.containsKey(host)) {
+            host = cfg.getDefaultServerHost();
+            Channel channel = host2Channel.get(host);
+            if (channel != null) {
                 // if there is a channel to default server, use it!
-                doPublish(pubSubData, host2Channel.get(host));
+                doPublish(pubSubData, channel);
                 return;
             }
             client.doConnect(pubSubData, host);
@@ -226,10 +228,9 @@ public class HedwigPublisher implements 
     // RemoteAddress tied to it.
     protected synchronized void storeHost2ChannelMapping(Channel channel) {
         InetSocketAddress host = HedwigClientImpl.getHostFromChannel(channel);
-        if (!closed && !host2Channel.containsKey(host)) {
+        if (!closed && host2Channel.putIfAbsent(host, channel) == null) {
             if (logger.isDebugEnabled())
-                logger.debug("Storing a new Channel mapping for host: " + host);
-            host2Channel.put(host, channel);
+                logger.debug("Stored a new Channel mapping for host: " + host);
         } else {
             // If we've reached here, that means we already have a Channel
             // mapping for the given host. This should ideally not happen

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java?rev=1384336&r1=1384335&r2=1384336&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/HedwigSubscriber.java Thu Sep 13 13:52:24 2012
@@ -84,7 +84,7 @@ public class HedwigSubscriber implements
 
     protected final HedwigClientImpl client;
     protected final ClientConfiguration cfg;
-    private Object closeLock = new Object();
+    private final Object closeLock = new Object();
     private boolean closed = false;
 
     public HedwigSubscriber(HedwigClientImpl client) {
@@ -175,14 +175,17 @@ public class HedwigSubscriber implements
         // subscribing to.
         PubSubData pubSubData = new PubSubData(topic, null, subscriberId, operationType, options, callback,
                                                context);
-        if (client.topic2Host.containsKey(topic)) {
-            InetSocketAddress host = client.topic2Host.get(topic);
-            if (operationType.equals(OperationType.UNSUBSCRIBE) && client.getPublisher().host2Channel.containsKey(host)) {
+
+        InetSocketAddress host = client.topic2Host.get(topic);
+        if (host != null) {
+            Channel existingChannel = null;
+            if (operationType.equals(OperationType.UNSUBSCRIBE) &&
+                (existingChannel = client.getPublisher().host2Channel.get(host)) != null) {
                 // For unsubscribes, we can reuse the channel connections to the
                 // server host that are cached for publishes. For publish and
                 // unsubscribe flows, we will thus use the same Channels and
                 // will cache and store them during the ConnectCallback.
-                doSubUnsub(pubSubData, client.getPublisher().host2Channel.get(host));
+                doSubUnsub(pubSubData, existingChannel);
             } else {
                 // We know which server host is the master for the topic so
                 // connect to that first. For subscribes, we want a new channel
@@ -348,7 +351,8 @@ public class HedwigSubscriber implements
                          + subscriberId.toStringUtf8() + ", messageSeqId: " + messageSeqId);
         TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
         // Check that this topic subscription on the client side exists.
-        if (!topicSubscriber2Channel.containsKey(topicSubscriber)) {
+        Channel channel = topicSubscriber2Channel.get(topicSubscriber);
+        if (channel == null) {
             throw new ClientNotSubscribedException(
                 "Cannot send consume message since client is not subscribed to topic: " + topic.toStringUtf8()
                 + ", subscriberId: " + subscriberId.toStringUtf8());
@@ -356,7 +360,7 @@ public class HedwigSubscriber implements
         PubSubData pubSubData = new PubSubData(topic, null, subscriberId, OperationType.CONSUME, null, null, null);
         // Send the consume message to the server using the same subscribe
         // channel that the topic subscription uses.
-        doConsume(pubSubData, topicSubscriber2Channel.get(topicSubscriber), messageSeqId);
+        doConsume(pubSubData, channel, messageSeqId);
     }
 
     /**
@@ -586,20 +590,16 @@ public class HedwigSubscriber implements
         // exists. The assumption is that the client should have in memory the
         // Channel created for the TopicSubscriber once the server has sent
         // an ack response to the initial subscribe request.
-        if (!topicSubscriber2Channel.containsKey(topicSubscriber)) {
+        Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
+        if (topicSubscriberChannel == null) {
             logger.error("Client is not yet subscribed to topic: " + topic.toStringUtf8() + ", subscriberId: "
                          + subscriberId.toStringUtf8());
             throw new ClientNotSubscribedException("Client is not yet subscribed to topic: " + topic.toStringUtf8()
                                                    + ", subscriberId: " + subscriberId.toStringUtf8());
         }
 
-        // Register the MessageHandler with the subscribe Channel's
-        // Response Handler.
-        Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
-
         // Need to ensure the setting of handler and the readability of channel is in sync
         // as there's a race condition that connection recovery and user might call this at the same time
-
         MessageHandler existedMsgHandler = topicSubscriber2MessageHandler.get(topicSubscriber);
         if (restart) {
             // restart using existing msg handler 
@@ -620,7 +620,7 @@ public class HedwigSubscriber implements
             .setMessageHandler(messageHandler);
         } catch (NoResponseHandlerException e) {
             // We did not find a response handler. So remove this subscription handler and throw an exception.
-            topicSubscriber2MessageHandler.remove(topicSubscriber);
+            topicSubscriber2MessageHandler.remove(topicSubscriber, existedMsgHandler);
             asyncCloseSubscription(topic, subscriberId, new Callback<Void>() {
                 @Override
                 public void operationFinished(Object ctx, Void resultOfOperation) {
@@ -664,7 +664,8 @@ public class HedwigSubscriber implements
         // exists. The assumption is that the client should have in memory the
         // Channel created for the TopicSubscriber once the server has sent
         // an ack response to the initial subscribe request.
-        if (!topicSubscriber2Channel.containsKey(topicSubscriber)) {
+        Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
+        if (topicSubscriberChannel == null) {
             logger.error("Client is not yet subscribed to topic: " + topic.toStringUtf8() + ", subscriberId: "
                          + subscriberId.toStringUtf8());
             throw new ClientNotSubscribedException("Client is not yet subscribed to topic: " + topic.toStringUtf8()
@@ -673,7 +674,6 @@ public class HedwigSubscriber implements
 
         // Unregister the MessageHandler for the subscribe Channel's
         // Response Handler.
-        Channel topicSubscriberChannel = topicSubscriber2Channel.get(topicSubscriber);
         try {
             HedwigClientImpl.getResponseHandlerFromChannel(topicSubscriberChannel).getSubscribeResponseHandler()
                 .setMessageHandler(null);
@@ -729,10 +729,9 @@ public class HedwigSubscriber implements
             logger.debug("Closing subscription asynchronously for topic: " + topic.toStringUtf8() + ", subscriberId: "
                          + subscriberId.toStringUtf8());
         TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
-        if (topicSubscriber2Channel.containsKey(topicSubscriber)) {
-            // Remove all cached references for the TopicSubscriber
-            Channel channel = topicSubscriber2Channel.get(topicSubscriber);
-            topicSubscriber2Channel.remove(topicSubscriber);
+        // Remove all cached references for the TopicSubscriber
+        Channel channel = topicSubscriber2Channel.remove(topicSubscriber);
+        if (channel != null) {
             // Close the subscribe channel asynchronously.
             try {
                 HedwigClientImpl.getResponseHandlerFromChannel(channel).handleChannelClosedExplicitly();
@@ -779,6 +778,7 @@ public class HedwigSubscriber implements
             }
             Channel oldc = topicSubscriber2Channel.putIfAbsent(topic, channel);
             if (oldc != null) {
+                logger.warn("Dropping new channel for " + topic + ", due to existing channel: " + oldc);
                 channel.close();
             }
             if (null != preferences) {

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java?rev=1384336&r1=1384335&r2=1384336&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/ResponseHandler.java Thu Sep 13 13:52:24 2012
@@ -127,9 +127,8 @@ public class ResponseHandler extends Sim
 
         // Response is an ack to a prior PubSubRequest so first retrieve the
         // PubSub data for this txn.
-        PubSubData pubSubData = txn2PubSubData.containsKey(response.getTxnId()) ? txn2PubSubData.get(response
-                                .getTxnId()) : null;
-        // Validate that the PubSub data for this txn is stored. If not, just
+        PubSubData pubSubData = txn2PubSubData.remove(response.getTxnId());
+        // Validate that the PubSub data for this txn was stored. If not, just
         // log an error message and return since we don't know how to handle
         // this.
         if (pubSubData == null) {
@@ -137,10 +136,6 @@ public class ResponseHandler extends Sim
             return;
         }
 
-        // Now that we've retrieved the PubSubData for this specific Txn ID, we
-        // can remove it from the Map.
-        txn2PubSubData.remove(response.getTxnId());
-
         // Store the topic2Host mapping if this wasn't a server redirect. We'll
         // assume that if the server was able to have an open Channel connection
         // to the client, and responded with an ack message other than the
@@ -242,8 +237,8 @@ public class ResponseHandler extends Sim
 
         // Check if we already have a Channel open to the redirected server
         // host.
-        boolean redirectedHostChannelExists = pub.host2Channel.containsKey(redirectedHost) ? true : false;
-        if (pubSubData.operationType.equals(OperationType.SUBSCRIBE) || !redirectedHostChannelExists) {
+        Channel redirectedHostChannel = pub.host2Channel.get(redirectedHost);
+        if (pubSubData.operationType.equals(OperationType.SUBSCRIBE) || redirectedHostChannel == null) {
             // We don't have an existing channel to the redirected host OR this
             // is a redirected Subscribe request. For Subscribe requests, we
             // always want to create a new unique Channel connection to the
@@ -254,9 +249,9 @@ public class ResponseHandler extends Sim
             // request again directly on the existing cached redirected host
             // channel.
             if (pubSubData.operationType.equals(OperationType.PUBLISH)) {
-                pub.doPublish(pubSubData, pub.host2Channel.get(redirectedHost));
+                pub.doPublish(pubSubData, redirectedHostChannel);
             } else if (pubSubData.operationType.equals(OperationType.UNSUBSCRIBE)) {
-                sub.doSubUnsub(pubSubData, pub.host2Channel.get(redirectedHost));
+                sub.doSubUnsub(pubSubData, redirectedHostChannel);
             }
         }
     }
@@ -295,12 +290,14 @@ public class ResponseHandler extends Sim
             // Due to race concurrency situations, it is possible to
             // create multiple channels to the same host for publish
             // and unsubscribe requests.
-            if (pub.host2Channel.containsKey(host) && pub.host2Channel.get(host).equals(ctx.getChannel())) {
+            Channel channel = pub.host2Channel.get(host);
+            if (channel != null && channel.equals(ctx.getChannel())) {
                 if (logger.isDebugEnabled())
                     logger.debug("Disconnected channel for host: " + host
                                  + " was for Publish/Unsubscribe requests so remove all references to it.");
-                pub.host2Channel.remove(host);
-                client.clearAllTopicsForHost(host);
+                if (pub.host2Channel.remove(host, channel)) {
+                    client.clearAllTopicsForHost(host);
+                }
             }
         } else {
             // Subscribe channel disconnected so first close and clear all

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java?rev=1384336&r1=1384335&r2=1384336&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ByteStringInterner.java Thu Sep 13 13:52:24 2012
@@ -26,20 +26,13 @@ public class ByteStringInterner {
     // TODO: how to release references when strings are no longer used. weak
     // references?
 
-    private static ConcurrentMap<ByteString, ByteString> map = new ConcurrentHashMap<ByteString, ByteString>();
+    private static final ConcurrentMap<ByteString, ByteString> map = new ConcurrentHashMap<ByteString, ByteString>();
 
     public static ByteString intern(ByteString in) {
-        ByteString presentValueInMap = map.get(in);
+        ByteString presentValueInMap = map.putIfAbsent(in, in);
         if (presentValueInMap != null) {
             return presentValueInMap;
         }
-
-        presentValueInMap = map.putIfAbsent(in, in);
-        if (presentValueInMap != null) {
-            return presentValueInMap;
-        }
-
         return in;
-
     }
 }