You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:32:21 UTC

[33/50] [abbrv] incubator-apex-malhar git commit: MLHR-1734 #resolve Added idempotency changes for RabbitMQ input and output operator

MLHR-1734 #resolve Added idempotency changes for RabbitMQ input and output operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/13a3fbea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/13a3fbea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/13a3fbea

Branch: refs/heads/master
Commit: 13a3fbea74b7deaf674c5f42dd945ebd01e17f65
Parents: 8e94665
Author: ishark <is...@datatorrent.com>
Authored: Mon Jun 29 16:14:14 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Mon Aug 10 19:00:55 2015 -0700

----------------------------------------------------------------------
 contrib/pom.xml                                 |   6 +
 .../rabbitmq/AbstractRabbitMQInputOperator.java | 141 +++++++++++++++++--
 .../AbstractRabbitMQOutputOperator.java         |  65 ++++++++-
 ...bstractSinglePortRabbitMQOutputOperator.java |   5 +-
 .../rabbitmq/RabbitMQInputOperatorTest.java     |  86 ++++++++---
 .../rabbitmq/RabbitMQOutputOperatorTest.java    |  28 ++--
 6 files changed, 283 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 9776e2f..76e8144 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -565,5 +565,11 @@
       <version>${dt.framework.version}</version>
       <type>jar</type>
     </dependency>
+    <dependency>
+      <groupId>com.datatorrent</groupId>
+      <artifactId>dt-engine</artifactId>
+      <version>${dt.framework.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
index 06b3b88..e408f5e 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
@@ -16,12 +16,22 @@
 package com.datatorrent.contrib.rabbitmq;
 
 import com.datatorrent.api.*;
-import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.netlet.util.DTThrowable;
 import com.rabbitmq.client.*;
+
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
+
 import javax.validation.constraints.NotNull;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,9 +71,9 @@ import org.slf4j.LoggerFactory;
  *
  * @since 0.3.2
  */
-public abstract class AbstractRabbitMQInputOperator<T>
-    implements InputOperator,
-Operator.ActivationListener<OperatorContext>
+public abstract class AbstractRabbitMQInputOperator<T> implements
+    InputOperator, Operator.ActivationListener<OperatorContext>,
+    Operator.CheckpointListener
 {
   private static final Logger logger = LoggerFactory.getLogger(AbstractRabbitMQInputOperator.class);
   @NotNull
@@ -87,8 +97,23 @@ Operator.ActivationListener<OperatorContext>
   protected transient Channel channel;
   protected transient TracingConsumer tracingConsumer;
   protected transient String cTag;
-  protected transient ArrayBlockingQueue<byte[]> holdingBuffer;
+  
+  protected transient ArrayBlockingQueue<KeyValPair<Long,byte[]>> holdingBuffer;
+  private IdempotentStorageManager idempotentStorageManager;
+  protected final transient Map<Long, byte[]> currentWindowRecoveryState;
+  private transient final Set<Long> pendingAck;
+  private transient final Set<Long> recoveredTags;
+  private transient long currentWindowId;
+  private transient int operatorContextId;
+  
+  public AbstractRabbitMQInputOperator()
+  {
+    currentWindowRecoveryState = new HashMap<Long, byte[]>();
+    pendingAck = new HashSet<Long>();
+    recoveredTags = new HashSet<Long>();
+  }
 
+  
 /**
  * define a consumer which can asynchronously receive data,
  * and added to holdingBuffer
@@ -124,8 +149,19 @@ Operator.ActivationListener<OperatorContext>
     @Override
     public void handleDelivery(String consumer_Tag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
     {
-      holdingBuffer.add(body);
-//      logger.debug("Received Async message:" + new String(body)+" buffersize:"+holdingBuffer.size());
+      long tag = envelope.getDeliveryTag();
+      if(envelope.isRedeliver() && (recoveredTags.contains(tag) || pendingAck.contains(tag)))
+      {
+        if(recoveredTags.contains(tag)) {
+          pendingAck.add(tag);
+        }
+        return;
+      }
+      
+      // Acknowledgements are sent at the end of the window after adding to idempotency manager
+      pendingAck.add(tag);
+      holdingBuffer.add(new KeyValPair<Long, byte[]>(tag, body));
+      logger.debug("Received Async message: {}  buffersize: {} ", new String(body), holdingBuffer.size());
     }
   }
 
@@ -137,7 +173,9 @@ Operator.ActivationListener<OperatorContext>
       ntuples = holdingBuffer.size();
     }
     for (int i = ntuples; i-- > 0;) {
-      emitTuple(holdingBuffer.poll());
+      KeyValPair<Long, byte[]> message =  holdingBuffer.poll();
+      currentWindowRecoveryState.put(message.getKey(), message.getValue());
+      emitTuple(message.getValue());
     }
   }
 
@@ -146,22 +184,72 @@ Operator.ActivationListener<OperatorContext>
   @Override
   public void beginWindow(long windowId)
   {
+    currentWindowId = windowId;
+    if (windowId <= this.idempotentStorageManager.getLargestRecoveryWindow()) {
+      replay(windowId);
+    }
   }
 
+  @SuppressWarnings("unchecked")
+  private void replay(long windowId) {      
+    Map<Long, byte[]> recoveredData;
+    try {
+      recoveredData = (Map<Long, byte[]>) this.idempotentStorageManager.load(operatorContextId, windowId);
+      if (recoveredData == null) {
+        return;
+      }
+      for (Entry<Long, byte[]>  recoveredEntry : recoveredData.entrySet()) {
+        recoveredTags.add(recoveredEntry.getKey());
+        emitTuple(recoveredEntry.getValue());
+      }
+    } catch (IOException e) {
+      DTThrowable.rethrow(e);
+    }
+  }
+
+  
   @Override
   public void endWindow()
   {
+    //No more messages can be consumed now. so we will call emit tuples once more
+    //so that any pending messages can be emitted.
+    KeyValPair<Long, byte[]> message;
+    while ((message = holdingBuffer.poll()) != null) {
+      currentWindowRecoveryState.put(message.getKey(), message.getValue());
+      emitTuple(message.getValue());      
+    }
+    
+    try {
+      this.idempotentStorageManager.save(currentWindowRecoveryState, operatorContextId, currentWindowId);
+    } catch (IOException e) {
+      DTThrowable.rethrow(e);
+    }
+    
+    currentWindowRecoveryState.clear();
+    
+    for (Long deliveryTag : pendingAck) {
+      try {
+        channel.basicAck(deliveryTag, false);
+      } catch (IOException e) {        
+        DTThrowable.rethrow(e);
+      }
+    }
+    
+    pendingAck.clear();
   }
 
   @Override
   public void setup(OperatorContext context)
   {
-    holdingBuffer = new ArrayBlockingQueue<byte[]>(bufferSize);
+    this.operatorContextId = context.getId();
+    holdingBuffer = new ArrayBlockingQueue<KeyValPair<Long, byte[]>>(bufferSize);
+    this.idempotentStorageManager.setup(context);
   }
 
   @Override
   public void teardown()
   {
+    this.idempotentStorageManager.teardown();
   }
 
   @Override
@@ -178,10 +266,12 @@ Operator.ActivationListener<OperatorContext>
       channel = connection.createChannel();
 
       channel.exchangeDeclare(exchange, exchangeType);
+      boolean resetQueueName = false;
       if (queueName == null){
         // unique queuename is generated
         // used in case of fanout exchange
         queueName = channel.queueDeclare().getQueue();
+        resetQueueName = true;
       } else {
         // user supplied name
         // used in case of direct exchange
@@ -193,7 +283,11 @@ Operator.ActivationListener<OperatorContext>
 //      consumer = new QueueingConsumer(channel);
 //      channel.basicConsume(queueName, true, consumer);
       tracingConsumer = new TracingConsumer(channel);
-      cTag = channel.basicConsume(queueName, true, tracingConsumer);
+      cTag = channel.basicConsume(queueName, false, tracingConsumer);
+      if(resetQueueName)
+      {
+        queueName = null;
+      }
     }
     catch (IOException ex) {
       throw new RuntimeException("Connection Failure", ex);
@@ -211,6 +305,23 @@ Operator.ActivationListener<OperatorContext>
       logger.debug(ex.toString());
     }
   }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+    try {
+      idempotentStorageManager.deleteUpTo(operatorContextId, windowId);
+    }
+    catch (IOException e) {
+      throw new RuntimeException("committing", e);
+    }
+  }
+
   public void setTupleBlast(int i)
   {
     this.tuple_blast = i;
@@ -275,5 +386,15 @@ Operator.ActivationListener<OperatorContext>
   {
     this.routingKey = routingKey;
   }
+  
+  public IdempotentStorageManager getIdempotentStorageManager() {
+    return idempotentStorageManager;
+  }
+  
+  public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) {
+    this.idempotentStorageManager = idempotentStorageManager;
+  }
+  
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
index a78febb..cc6b7db 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
@@ -16,12 +16,16 @@
 package com.datatorrent.contrib.rabbitmq;
 
 import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.netlet.util.DTThrowable;
 import com.datatorrent.api.Context.OperatorContext;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.QueueingConsumer;
+
 import java.io.IOException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,21 +70,70 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator
   transient Channel channel = null;
   transient String exchange = "testEx";
   transient String queueName="testQ";
+  
+  private IdempotentStorageManager idempotentStorageManager;  
+  private transient long currentWindowId;
+  private transient long largestRecoveryWindowId;
+  private transient int operatorContextId;
+  protected transient boolean skipProcessingTuple = false;
+  private transient OperatorContext context;
+
 
   @Override
   public void setup(OperatorContext context)
   {
+    // Needed to setup idempotency storage manager in setter 
+    this.context = context;
+    this.operatorContextId = context.getId();
+
     try {
       connFactory.setHost("localhost");
       connection = connFactory.newConnection();
       channel = connection.createChannel();
       channel.exchangeDeclare(exchange, "fanout");
-//      channel.queueDeclare(queueName, false, false, false, null);
+
+      this.idempotentStorageManager.setup(context);
+
     }
     catch (IOException ex) {
       logger.debug(ex.toString());
+      DTThrowable.rethrow(ex);
+    }
+  }
+  
+  @Override
+  public void beginWindow(long windowId)
+  {
+    currentWindowId = windowId;    
+    largestRecoveryWindowId = idempotentStorageManager.getLargestRecoveryWindow();
+    if (windowId <= largestRecoveryWindowId) {
+      // Do not resend already sent tuples
+      skipProcessingTuple = true;
+    }
+    else
+    {
+      skipProcessingTuple = false;
     }
   }
+  
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void endWindow()
+  {
+    if(currentWindowId < largestRecoveryWindowId)
+    {
+      // ignore
+      return;
+    }
+    try {
+      idempotentStorageManager.save("processedWindow", operatorContextId, currentWindowId);
+    } catch (IOException e) {
+      DTThrowable.rethrow(e);
+    }
+  }
+
 
   public void setQueueName(String queueName) {
     this.queueName = queueName;
@@ -95,9 +148,19 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator
     try {
       channel.close();
       connection.close();
+      this.idempotentStorageManager.teardown();
     }
     catch (IOException ex) {
       logger.debug(ex.toString());
     }
   }
+  
+  public IdempotentStorageManager getIdempotentStorageManager() {
+    return idempotentStorageManager;
+  }
+  
+  public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) {    
+    this.idempotentStorageManager = idempotentStorageManager;    
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java
index c16f70f..8e6ff39 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java
@@ -60,7 +60,10 @@ public abstract class AbstractSinglePortRabbitMQOutputOperator<T> extends Abstra
     @Override
     public void process(T tuple)
     {
-      processTuple(tuple); // This is an abstract call
+      if(!skipProcessingTuple)
+      {
+        processTuple(tuple); // This is an abstract call
+      }
     }
   };
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
index 041e362..a14f4e7 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
@@ -25,17 +25,21 @@ import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.QueueingConsumer;
 
+import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.datatorrent.contrib.helper.CollectorModule;
 import com.datatorrent.contrib.helper.MessageQueueTestHelper;
-
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Attribute;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.LocalMode;
-
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.datatorrent.netlet.util.DTThrowable;
 
 /**
@@ -44,7 +48,7 @@ import com.datatorrent.netlet.util.DTThrowable;
 public class RabbitMQInputOperatorTest
 {
   private static Logger logger = LoggerFactory.getLogger(RabbitMQInputOperatorTest.class);
-  
+
   public static final class TestStringRabbitMQInputOperator extends AbstractSinglePortRabbitMQInputOperator<String>
   {
     @Override
@@ -75,7 +79,6 @@ public class RabbitMQInputOperatorTest
       connection = connFactory.newConnection();
       channel = connection.createChannel();
       channel.exchangeDeclare(exchange, "fanout");
-//      channel.queueDeclare(queueName, false, false, false, null);
     }
 
     public void setQueueName(String queueName)
@@ -86,9 +89,7 @@ public class RabbitMQInputOperatorTest
     public void process(Object message) throws IOException
     {
       String msg = message.toString();
-//      logger.debug("publish:" + msg);
       channel.basicPublish(exchange, "", null, msg.getBytes());
-//      channel.basicPublish("", queueName, null, msg.getBytes());
     }
 
     public void teardown() throws IOException
@@ -100,12 +101,11 @@ public class RabbitMQInputOperatorTest
     public void generateMessages(int msgCount) throws InterruptedException, IOException
     {
       for (int i = 0; i < msgCount; i++) {
-        
-        ArrayList<HashMap<String, Integer>>  dataMaps = MessageQueueTestHelper.getMessages();
-        for(int j =0; j < dataMaps.size(); j++)
-        {
-          process(dataMaps.get(j));  
-        }        
+
+        ArrayList<HashMap<String, Integer>> dataMaps = MessageQueueTestHelper.getMessages();
+        for (int j = 0; j < dataMaps.size(); j++) {
+          process(dataMaps.get(j));
+        }
       }
     }
 
@@ -124,6 +124,8 @@ public class RabbitMQInputOperatorTest
     LocalMode lma = LocalMode.newInstance();
     DAG dag = lma.getDAG();
     RabbitMQInputOperator consumer = dag.addOperator("Consumer", RabbitMQInputOperator.class);
+    consumer.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+
     final CollectorModule<byte[]> collector = dag.addOperator("Collector", new CollectorModule<byte[]>());
 
     consumer.setHost("localhost");
@@ -144,7 +146,7 @@ public class RabbitMQInputOperatorTest
       public void run()
       {
         long startTms = System.currentTimeMillis();
-        long timeout = 10000L;
+        long timeout = 100000L;
         try {
           while (!collector.inputPort.collections.containsKey("collector") && System.currentTimeMillis() - startTms < timeout) {
             Thread.sleep(500);
@@ -153,16 +155,14 @@ public class RabbitMQInputOperatorTest
           startTms = System.currentTimeMillis();
           while (System.currentTimeMillis() - startTms < timeout) {
             List<?> list = collector.inputPort.collections.get("collector");
-            
+
             if (list.size() < testNum * 3) {
               Thread.sleep(10);
-            }
-            else {
+            } else {
               break;
             }
           }
-        }
-        catch (IOException ex) {
+        } catch (IOException ex) {
           logger.error(ex.getMessage(), ex);
           DTThrowable.rethrow(ex);
         } catch (InterruptedException ex) {
@@ -179,5 +179,53 @@ public class RabbitMQInputOperatorTest
     logger.debug("collection size: {} {}", collector.inputPort.collections.size(), collector.inputPort.collections);
 
     MessageQueueTestHelper.validateResults(testNum, collector.inputPort.collections);
-  }  
+  }
+
+  @Test
+  public void testRecoveryAndIdempotency() throws Exception
+  {
+    RabbitMQInputOperator operator = new RabbitMQInputOperator();
+    operator.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+    operator.setHost("localhost");
+    operator.setExchange("testEx");
+    operator.setExchangeType("fanout");
+
+    Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+    CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
+
+    operator.outputPort.setSink(sink);
+    OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap);
+
+    operator.setup(context);
+    operator.activate(context);
+
+    final RabbitMQMessageGenerator publisher = new RabbitMQMessageGenerator();
+    publisher.setup();
+    publisher.generateMessages(5);
+
+    Thread.sleep(10000);
+
+    operator.beginWindow(1);
+    operator.emitTuples();
+    operator.endWindow();
+
+    operator.deactivate();
+    Assert.assertEquals("num of messages in window 1", 15, sink.collectedTuples.size());
+
+    // failure and then re-deployment of operator
+    sink.collectedTuples.clear();
+    operator.setup(context);
+    operator.activate(context);
+
+    Assert.assertEquals("largest recovery window", 1, operator.getIdempotentStorageManager().getLargestRecoveryWindow());
+    operator.beginWindow(1);
+    operator.endWindow();
+    Assert.assertEquals("num of messages in window 1", 15, sink.collectedTuples.size());
+    sink.collectedTuples.clear();
+
+    operator.deactivate();
+    operator.teardown();
+    operator.getIdempotentStorageManager().deleteUpTo(context.getId(), 1);
+    publisher.teardown();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
index a170a0e..27213c3 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
@@ -27,7 +27,7 @@ import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
 import com.datatorrent.contrib.helper.SourceModule;
-
+import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.LocalMode;
@@ -45,7 +45,7 @@ public class RabbitMQOutputOperatorTest
     public int count = 0;
     private final String host = "localhost";
     ConnectionFactory connFactory = new ConnectionFactory();
-//  QueueingConsumer consumer = null;
+    // QueueingConsumer consumer = null;
     Connection connection = null;
     Channel channel = null;
     TracingConsumer tracingConsumer = null;
@@ -64,8 +64,6 @@ public class RabbitMQOutputOperatorTest
       queueName = channel.queueDeclare().getQueue();
       channel.queueBind(queueName, exchange, "");
 
-//      consumer = new QueueingConsumer(channel);
-//      channel.basicConsume(queueName, true, consumer);
       tracingConsumer = new TracingConsumer(channel);
       cTag = channel.basicConsume(queueName, true, tracingConsumer);
     }
@@ -125,7 +123,6 @@ public class RabbitMQOutputOperatorTest
     }
   }
 
-
   @Test
   public void testDag() throws InterruptedException, MalformedURLException, IOException, Exception
   {
@@ -133,7 +130,7 @@ public class RabbitMQOutputOperatorTest
     runTest(testNum);
     logger.debug("end of test");
   }
-  
+
   protected void runTest(int testNum) throws IOException
   {
     RabbitMQMessageReceiver receiver = new RabbitMQMessageReceiver();
@@ -144,23 +141,22 @@ public class RabbitMQOutputOperatorTest
     SourceModule source = dag.addOperator("source", new SourceModule());
     source.setTestNum(testNum);
     RabbitMQOutputOperator collector = dag.addOperator("generator", new RabbitMQOutputOperator());
-    
+    collector.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+
     collector.setExchange("testEx");
     dag.addStream("Stream", source.outPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);
 
     final LocalMode.Controller lc = lma.getController();
     lc.setHeartbeatMonitoringEnabled(false);
     lc.runAsync();
-    try {      
+    try {
       Thread.sleep(1000);
       long timeout = 10000L;
       long startTms = System.currentTimeMillis();
-      while((receiver.count < testNum * 3) && (System.currentTimeMillis() - startTms < timeout))
-      {
+      while ((receiver.count < testNum * 3) && (System.currentTimeMillis() - startTms < timeout)) {
         Thread.sleep(100);
-      } 
-    }
-    catch (InterruptedException ex) {
+      }
+    } catch (InterruptedException ex) {
       Assert.fail(ex.getMessage());
     } finally {
       lc.shutdown();
@@ -170,11 +166,9 @@ public class RabbitMQOutputOperatorTest
     for (Map.Entry<String, Integer> e : receiver.dataMap.entrySet()) {
       if (e.getKey().equals("a")) {
         Assert.assertEquals("emitted value for 'a' was ", new Integer(2), e.getValue());
-      }
-      else if (e.getKey().equals("b")) {
+      } else if (e.getKey().equals("b")) {
         Assert.assertEquals("emitted value for 'b' was ", new Integer(20), e.getValue());
-      }
-      else if (e.getKey().equals("c")) {
+      } else if (e.getKey().equals("c")) {
         Assert.assertEquals("emitted value for 'c' was ", new Integer(1000), e.getValue());
       }
     }