You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2015/09/10 00:32:21 UTC
[33/50] [abbrv] incubator-apex-malhar git commit: MLHR-1734 #resolve
Added idempotency changes for RabbitMQ input and output operator
MLHR-1734 #resolve Added idempotency changes for RabbitMQ input and output operator
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/13a3fbea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/13a3fbea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/13a3fbea
Branch: refs/heads/master
Commit: 13a3fbea74b7deaf674c5f42dd945ebd01e17f65
Parents: 8e94665
Author: ishark <is...@datatorrent.com>
Authored: Mon Jun 29 16:14:14 2015 -0700
Committer: ishark <is...@datatorrent.com>
Committed: Mon Aug 10 19:00:55 2015 -0700
----------------------------------------------------------------------
contrib/pom.xml | 6 +
.../rabbitmq/AbstractRabbitMQInputOperator.java | 141 +++++++++++++++++--
.../AbstractRabbitMQOutputOperator.java | 65 ++++++++-
...bstractSinglePortRabbitMQOutputOperator.java | 5 +-
.../rabbitmq/RabbitMQInputOperatorTest.java | 86 ++++++++---
.../rabbitmq/RabbitMQOutputOperatorTest.java | 28 ++--
6 files changed, 283 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/pom.xml b/contrib/pom.xml
index 9776e2f..76e8144 100755
--- a/contrib/pom.xml
+++ b/contrib/pom.xml
@@ -565,5 +565,11 @@
<version>${dt.framework.version}</version>
<type>jar</type>
</dependency>
+ <dependency>
+ <groupId>com.datatorrent</groupId>
+ <artifactId>dt-engine</artifactId>
+ <version>${dt.framework.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
index 06b3b88..e408f5e 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
@@ -16,12 +16,22 @@
package com.datatorrent.contrib.rabbitmq;
import com.datatorrent.api.*;
-import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.util.KeyValPair;
+import com.datatorrent.netlet.util.DTThrowable;
import com.rabbitmq.client.*;
+
import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
+
import javax.validation.constraints.NotNull;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,9 +71,9 @@ import org.slf4j.LoggerFactory;
*
* @since 0.3.2
*/
-public abstract class AbstractRabbitMQInputOperator<T>
- implements InputOperator,
-Operator.ActivationListener<OperatorContext>
+public abstract class AbstractRabbitMQInputOperator<T> implements
+ InputOperator, Operator.ActivationListener<OperatorContext>,
+ Operator.CheckpointListener
{
private static final Logger logger = LoggerFactory.getLogger(AbstractRabbitMQInputOperator.class);
@NotNull
@@ -87,8 +97,23 @@ Operator.ActivationListener<OperatorContext>
protected transient Channel channel;
protected transient TracingConsumer tracingConsumer;
protected transient String cTag;
- protected transient ArrayBlockingQueue<byte[]> holdingBuffer;
+
+ protected transient ArrayBlockingQueue<KeyValPair<Long,byte[]>> holdingBuffer;
+ private IdempotentStorageManager idempotentStorageManager;
+ protected final transient Map<Long, byte[]> currentWindowRecoveryState;
+ private transient final Set<Long> pendingAck;
+ private transient final Set<Long> recoveredTags;
+ private transient long currentWindowId;
+ private transient int operatorContextId;
+
+ public AbstractRabbitMQInputOperator()
+ {
+ currentWindowRecoveryState = new HashMap<Long, byte[]>();
+ pendingAck = new HashSet<Long>();
+ recoveredTags = new HashSet<Long>();
+ }
+
/**
* define a consumer which can asynchronously receive data,
* and added to holdingBuffer
@@ -124,8 +149,19 @@ Operator.ActivationListener<OperatorContext>
@Override
public void handleDelivery(String consumer_Tag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
{
- holdingBuffer.add(body);
-// logger.debug("Received Async message:" + new String(body)+" buffersize:"+holdingBuffer.size());
+ long tag = envelope.getDeliveryTag();
+ if(envelope.isRedeliver() && (recoveredTags.contains(tag) || pendingAck.contains(tag)))
+ {
+ if(recoveredTags.contains(tag)) {
+ pendingAck.add(tag);
+ }
+ return;
+ }
+
+ // Acknowledgements are sent at the end of the window after adding to idempotency manager
+ pendingAck.add(tag);
+ holdingBuffer.add(new KeyValPair<Long, byte[]>(tag, body));
+ logger.debug("Received Async message: {} buffersize: {} ", new String(body), holdingBuffer.size());
}
}
@@ -137,7 +173,9 @@ Operator.ActivationListener<OperatorContext>
ntuples = holdingBuffer.size();
}
for (int i = ntuples; i-- > 0;) {
- emitTuple(holdingBuffer.poll());
+ KeyValPair<Long, byte[]> message = holdingBuffer.poll();
+ currentWindowRecoveryState.put(message.getKey(), message.getValue());
+ emitTuple(message.getValue());
}
}
@@ -146,22 +184,72 @@ Operator.ActivationListener<OperatorContext>
@Override
public void beginWindow(long windowId)
{
+ currentWindowId = windowId;
+ if (windowId <= this.idempotentStorageManager.getLargestRecoveryWindow()) {
+ replay(windowId);
+ }
}
+ @SuppressWarnings("unchecked")
+ private void replay(long windowId) {
+ Map<Long, byte[]> recoveredData;
+ try {
+ recoveredData = (Map<Long, byte[]>) this.idempotentStorageManager.load(operatorContextId, windowId);
+ if (recoveredData == null) {
+ return;
+ }
+ for (Entry<Long, byte[]> recoveredEntry : recoveredData.entrySet()) {
+ recoveredTags.add(recoveredEntry.getKey());
+ emitTuple(recoveredEntry.getValue());
+ }
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+
+
@Override
public void endWindow()
{
+ //No more messages can be consumed now. so we will call emit tuples once more
+ //so that any pending messages can be emitted.
+ KeyValPair<Long, byte[]> message;
+ while ((message = holdingBuffer.poll()) != null) {
+ currentWindowRecoveryState.put(message.getKey(), message.getValue());
+ emitTuple(message.getValue());
+ }
+
+ try {
+ this.idempotentStorageManager.save(currentWindowRecoveryState, operatorContextId, currentWindowId);
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+
+ currentWindowRecoveryState.clear();
+
+ for (Long deliveryTag : pendingAck) {
+ try {
+ channel.basicAck(deliveryTag, false);
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+
+ pendingAck.clear();
}
@Override
public void setup(OperatorContext context)
{
- holdingBuffer = new ArrayBlockingQueue<byte[]>(bufferSize);
+ this.operatorContextId = context.getId();
+ holdingBuffer = new ArrayBlockingQueue<KeyValPair<Long, byte[]>>(bufferSize);
+ this.idempotentStorageManager.setup(context);
}
@Override
public void teardown()
{
+ this.idempotentStorageManager.teardown();
}
@Override
@@ -178,10 +266,12 @@ Operator.ActivationListener<OperatorContext>
channel = connection.createChannel();
channel.exchangeDeclare(exchange, exchangeType);
+ boolean resetQueueName = false;
if (queueName == null){
// unique queuename is generated
// used in case of fanout exchange
queueName = channel.queueDeclare().getQueue();
+ resetQueueName = true;
} else {
// user supplied name
// used in case of direct exchange
@@ -193,7 +283,11 @@ Operator.ActivationListener<OperatorContext>
// consumer = new QueueingConsumer(channel);
// channel.basicConsume(queueName, true, consumer);
tracingConsumer = new TracingConsumer(channel);
- cTag = channel.basicConsume(queueName, true, tracingConsumer);
+ cTag = channel.basicConsume(queueName, false, tracingConsumer);
+ if(resetQueueName)
+ {
+ queueName = null;
+ }
}
catch (IOException ex) {
throw new RuntimeException("Connection Failure", ex);
@@ -211,6 +305,23 @@ Operator.ActivationListener<OperatorContext>
logger.debug(ex.toString());
}
}
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ try {
+ idempotentStorageManager.deleteUpTo(operatorContextId, windowId);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("committing", e);
+ }
+ }
+
public void setTupleBlast(int i)
{
this.tuple_blast = i;
@@ -275,5 +386,15 @@ Operator.ActivationListener<OperatorContext>
{
this.routingKey = routingKey;
}
+
+ public IdempotentStorageManager getIdempotentStorageManager() {
+ return idempotentStorageManager;
+ }
+
+ public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) {
+ this.idempotentStorageManager = idempotentStorageManager;
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
index a78febb..cc6b7db 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java
@@ -16,12 +16,16 @@
package com.datatorrent.contrib.rabbitmq;
import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.netlet.util.DTThrowable;
import com.datatorrent.api.Context.OperatorContext;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
+
import java.io.IOException;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,21 +70,70 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator
transient Channel channel = null;
transient String exchange = "testEx";
transient String queueName="testQ";
+
+ private IdempotentStorageManager idempotentStorageManager;
+ private transient long currentWindowId;
+ private transient long largestRecoveryWindowId;
+ private transient int operatorContextId;
+ protected transient boolean skipProcessingTuple = false;
+ private transient OperatorContext context;
+
@Override
public void setup(OperatorContext context)
{
+ // Needed to setup idempotency storage manager in setter
+ this.context = context;
+ this.operatorContextId = context.getId();
+
try {
connFactory.setHost("localhost");
connection = connFactory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare(exchange, "fanout");
-// channel.queueDeclare(queueName, false, false, false, null);
+
+ this.idempotentStorageManager.setup(context);
+
}
catch (IOException ex) {
logger.debug(ex.toString());
+ DTThrowable.rethrow(ex);
+ }
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ currentWindowId = windowId;
+ largestRecoveryWindowId = idempotentStorageManager.getLargestRecoveryWindow();
+ if (windowId <= largestRecoveryWindowId) {
+ // Do not resend already sent tuples
+ skipProcessingTuple = true;
+ }
+ else
+ {
+ skipProcessingTuple = false;
}
}
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void endWindow()
+ {
+ if(currentWindowId < largestRecoveryWindowId)
+ {
+ // ignore
+ return;
+ }
+ try {
+ idempotentStorageManager.save("processedWindow", operatorContextId, currentWindowId);
+ } catch (IOException e) {
+ DTThrowable.rethrow(e);
+ }
+ }
+
public void setQueueName(String queueName) {
this.queueName = queueName;
@@ -95,9 +148,19 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator
try {
channel.close();
connection.close();
+ this.idempotentStorageManager.teardown();
}
catch (IOException ex) {
logger.debug(ex.toString());
}
}
+
+ public IdempotentStorageManager getIdempotentStorageManager() {
+ return idempotentStorageManager;
+ }
+
+ public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) {
+ this.idempotentStorageManager = idempotentStorageManager;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java
index c16f70f..8e6ff39 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractSinglePortRabbitMQOutputOperator.java
@@ -60,7 +60,10 @@ public abstract class AbstractSinglePortRabbitMQOutputOperator<T> extends Abstra
@Override
public void process(T tuple)
{
- processTuple(tuple); // This is an abstract call
+ if(!skipProcessingTuple)
+ {
+ processTuple(tuple); // This is an abstract call
+ }
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
index 041e362..a14f4e7 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java
@@ -25,17 +25,21 @@ import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
+import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.contrib.helper.CollectorModule;
import com.datatorrent.contrib.helper.MessageQueueTestHelper;
-
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Attribute;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.LocalMode;
-
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.testbench.CollectorTestSink;
import com.datatorrent.netlet.util.DTThrowable;
/**
@@ -44,7 +48,7 @@ import com.datatorrent.netlet.util.DTThrowable;
public class RabbitMQInputOperatorTest
{
private static Logger logger = LoggerFactory.getLogger(RabbitMQInputOperatorTest.class);
-
+
public static final class TestStringRabbitMQInputOperator extends AbstractSinglePortRabbitMQInputOperator<String>
{
@Override
@@ -75,7 +79,6 @@ public class RabbitMQInputOperatorTest
connection = connFactory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare(exchange, "fanout");
-// channel.queueDeclare(queueName, false, false, false, null);
}
public void setQueueName(String queueName)
@@ -86,9 +89,7 @@ public class RabbitMQInputOperatorTest
public void process(Object message) throws IOException
{
String msg = message.toString();
-// logger.debug("publish:" + msg);
channel.basicPublish(exchange, "", null, msg.getBytes());
-// channel.basicPublish("", queueName, null, msg.getBytes());
}
public void teardown() throws IOException
@@ -100,12 +101,11 @@ public class RabbitMQInputOperatorTest
public void generateMessages(int msgCount) throws InterruptedException, IOException
{
for (int i = 0; i < msgCount; i++) {
-
- ArrayList<HashMap<String, Integer>> dataMaps = MessageQueueTestHelper.getMessages();
- for(int j =0; j < dataMaps.size(); j++)
- {
- process(dataMaps.get(j));
- }
+
+ ArrayList<HashMap<String, Integer>> dataMaps = MessageQueueTestHelper.getMessages();
+ for (int j = 0; j < dataMaps.size(); j++) {
+ process(dataMaps.get(j));
+ }
}
}
@@ -124,6 +124,8 @@ public class RabbitMQInputOperatorTest
LocalMode lma = LocalMode.newInstance();
DAG dag = lma.getDAG();
RabbitMQInputOperator consumer = dag.addOperator("Consumer", RabbitMQInputOperator.class);
+ consumer.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+
final CollectorModule<byte[]> collector = dag.addOperator("Collector", new CollectorModule<byte[]>());
consumer.setHost("localhost");
@@ -144,7 +146,7 @@ public class RabbitMQInputOperatorTest
public void run()
{
long startTms = System.currentTimeMillis();
- long timeout = 10000L;
+ long timeout = 100000L;
try {
while (!collector.inputPort.collections.containsKey("collector") && System.currentTimeMillis() - startTms < timeout) {
Thread.sleep(500);
@@ -153,16 +155,14 @@ public class RabbitMQInputOperatorTest
startTms = System.currentTimeMillis();
while (System.currentTimeMillis() - startTms < timeout) {
List<?> list = collector.inputPort.collections.get("collector");
-
+
if (list.size() < testNum * 3) {
Thread.sleep(10);
- }
- else {
+ } else {
break;
}
}
- }
- catch (IOException ex) {
+ } catch (IOException ex) {
logger.error(ex.getMessage(), ex);
DTThrowable.rethrow(ex);
} catch (InterruptedException ex) {
@@ -179,5 +179,53 @@ public class RabbitMQInputOperatorTest
logger.debug("collection size: {} {}", collector.inputPort.collections.size(), collector.inputPort.collections);
MessageQueueTestHelper.validateResults(testNum, collector.inputPort.collections);
- }
+ }
+
+ @Test
+ public void testRecoveryAndIdempotency() throws Exception
+ {
+ RabbitMQInputOperator operator = new RabbitMQInputOperator();
+ operator.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+ operator.setHost("localhost");
+ operator.setExchange("testEx");
+ operator.setExchangeType("fanout");
+
+ Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+ CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
+
+ operator.outputPort.setSink(sink);
+ OperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap);
+
+ operator.setup(context);
+ operator.activate(context);
+
+ final RabbitMQMessageGenerator publisher = new RabbitMQMessageGenerator();
+ publisher.setup();
+ publisher.generateMessages(5);
+
+ Thread.sleep(10000);
+
+ operator.beginWindow(1);
+ operator.emitTuples();
+ operator.endWindow();
+
+ operator.deactivate();
+ Assert.assertEquals("num of messages in window 1", 15, sink.collectedTuples.size());
+
+ // failure and then re-deployment of operator
+ sink.collectedTuples.clear();
+ operator.setup(context);
+ operator.activate(context);
+
+ Assert.assertEquals("largest recovery window", 1, operator.getIdempotentStorageManager().getLargestRecoveryWindow());
+ operator.beginWindow(1);
+ operator.endWindow();
+ Assert.assertEquals("num of messages in window 1", 15, sink.collectedTuples.size());
+ sink.collectedTuples.clear();
+
+ operator.deactivate();
+ operator.teardown();
+ operator.getIdempotentStorageManager().deleteUpTo(context.getId(), 1);
+ publisher.teardown();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/13a3fbea/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
index a170a0e..27213c3 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java
@@ -27,7 +27,7 @@ import org.junit.Test;
import org.slf4j.LoggerFactory;
import com.datatorrent.contrib.helper.SourceModule;
-
+import com.datatorrent.lib.io.IdempotentStorageManager;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.LocalMode;
@@ -45,7 +45,7 @@ public class RabbitMQOutputOperatorTest
public int count = 0;
private final String host = "localhost";
ConnectionFactory connFactory = new ConnectionFactory();
-// QueueingConsumer consumer = null;
+ // QueueingConsumer consumer = null;
Connection connection = null;
Channel channel = null;
TracingConsumer tracingConsumer = null;
@@ -64,8 +64,6 @@ public class RabbitMQOutputOperatorTest
queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchange, "");
-// consumer = new QueueingConsumer(channel);
-// channel.basicConsume(queueName, true, consumer);
tracingConsumer = new TracingConsumer(channel);
cTag = channel.basicConsume(queueName, true, tracingConsumer);
}
@@ -125,7 +123,6 @@ public class RabbitMQOutputOperatorTest
}
}
-
@Test
public void testDag() throws InterruptedException, MalformedURLException, IOException, Exception
{
@@ -133,7 +130,7 @@ public class RabbitMQOutputOperatorTest
runTest(testNum);
logger.debug("end of test");
}
-
+
protected void runTest(int testNum) throws IOException
{
RabbitMQMessageReceiver receiver = new RabbitMQMessageReceiver();
@@ -144,23 +141,22 @@ public class RabbitMQOutputOperatorTest
SourceModule source = dag.addOperator("source", new SourceModule());
source.setTestNum(testNum);
RabbitMQOutputOperator collector = dag.addOperator("generator", new RabbitMQOutputOperator());
-
+ collector.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+
collector.setExchange("testEx");
dag.addStream("Stream", source.outPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);
final LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(false);
lc.runAsync();
- try {
+ try {
Thread.sleep(1000);
long timeout = 10000L;
long startTms = System.currentTimeMillis();
- while((receiver.count < testNum * 3) && (System.currentTimeMillis() - startTms < timeout))
- {
+ while ((receiver.count < testNum * 3) && (System.currentTimeMillis() - startTms < timeout)) {
Thread.sleep(100);
- }
- }
- catch (InterruptedException ex) {
+ }
+ } catch (InterruptedException ex) {
Assert.fail(ex.getMessage());
} finally {
lc.shutdown();
@@ -170,11 +166,9 @@ public class RabbitMQOutputOperatorTest
for (Map.Entry<String, Integer> e : receiver.dataMap.entrySet()) {
if (e.getKey().equals("a")) {
Assert.assertEquals("emitted value for 'a' was ", new Integer(2), e.getValue());
- }
- else if (e.getKey().equals("b")) {
+ } else if (e.getKey().equals("b")) {
Assert.assertEquals("emitted value for 'b' was ", new Integer(20), e.getValue());
- }
- else if (e.getKey().equals("c")) {
+ } else if (e.getKey().equals("c")) {
Assert.assertEquals("emitted value for 'c' was ", new Integer(1000), e.getValue());
}
}