You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/08/09 23:58:13 UTC
[1/2] apex-malhar git commit: JMS Input operator changes to support
SQS and ActiveMQ Incorporate comments: javadocs,
NotNull constraint for the builder and create function to return the builder
ref
Repository: apex-malhar
Updated Branches:
refs/heads/master 5972bca41 -> 37cb58484
JMS Input operator changes to support SQS and ActiveMQ
Incorporate comments: javadocs, NotNull constraint for the builder and create function to return the builder ref
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/d46a6be6
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/d46a6be6
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/d46a6be6
Branch: refs/heads/master
Commit: d46a6be62d2b70bb99ab7ca02d0a87ea2d5eba38
Parents: 7dea3d0
Author: Sanjay Pujare <sa...@Sanjay-DT-MacBook-Pro.local>
Authored: Mon Aug 8 12:40:05 2016 -0700
Committer: Sanjay Pujare <sa...@Sanjay-DT-MacBook-Pro.local>
Committed: Tue Aug 9 16:38:13 2016 -0700
----------------------------------------------------------------------
library/pom.xml | 34 ++-
.../lib/io/jms/AbstractJMSInputOperator.java | 10 +
.../com/datatorrent/lib/io/jms/JMSBase.java | 214 +++++++++++++--
.../lib/io/jms/JMSObjectInputOperatorTest.java | 1 +
.../lib/io/jms/JMSStringInputOperatorTest.java | 2 +
.../lib/io/jms/SQSStringInputOperatorTest.java | 262 +++++++++++++++++++
.../com/datatorrent/lib/io/jms/SQSTestBase.java | 187 +++++++++++++
.../src/test/resources/sqsdevCreds.properties | 21 ++
.../src/test/resources/sqstestCreds.properties | 21 ++
9 files changed, 719 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/pom.xml
----------------------------------------------------------------------
diff --git a/library/pom.xml b/library/pom.xml
index 8d264a4..8fe30d0 100644
--- a/library/pom.xml
+++ b/library/pom.xml
@@ -306,12 +306,18 @@
<artifactId>janino</artifactId>
<version>2.7.8</version>
<scope>test</scope>
- </dependency>
- <dependency>
+ </dependency>
+ <dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.8.5</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
@@ -346,9 +352,27 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>2.5.4</version>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-sqs</artifactId>
+ <version>1.10.73</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.5.4</version>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>amazon-sqs-java-messaging-lib</artifactId>
+ <version>1.0.0</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-sqs</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
index cc27c88..bf0fe5c 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
@@ -514,6 +514,16 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase
return this.windowDataManager;
}
+ /**
+ * Sets this transacted value
+ *
+ * @param value new value for transacted
+ */
+ public void setTransacted(boolean value)
+ {
+ transacted = value;
+ }
+
protected abstract void emit(T payload);
public static enum CounterKeys
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java b/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java
index 48ed2c3..772464a 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java
@@ -25,6 +25,7 @@ import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
+import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,6 +34,8 @@ import org.apache.commons.beanutils.BeanUtils;
import com.google.common.collect.Maps;
+import com.datatorrent.netlet.util.DTThrowable;
+
/**
* Base class for any JMS input or output adapter operator.
* <p/>
@@ -76,11 +79,11 @@ public class JMSBase
private transient Session session;
private transient Destination destination;
- private String connectionFactoryClass;
- private Map<String, String> connectionFactoryProperties = Maps.newHashMap();
+ @NotNull
+ private ConnectionFactoryBuilder connectionFactoryBuilder;
private String ackMode = "CLIENT_ACKNOWLEDGE";
- private String clientId = "TestClient";
- private String subject = "TEST.FOO";
+ private String clientId;
+ private String subject;
private int batch = 10;
private int messageSize = 255;
private boolean durable = false;
@@ -89,6 +92,105 @@ public class JMSBase
protected boolean transacted = true;
/**
+ * Builder class that allows caller to build the connection factory (optional)
+ *
+ */
+ public static interface ConnectionFactoryBuilder
+ {
+
+ /**
+ * This method is called by the operator to return properly built
+ * (authenticated, connected etc) connection factory
+ *
+ * @return properly built connection factory
+ */
+ public ConnectionFactory buildConnectionFactory();
+ }
+
+ /**
+ * Default implementation for {@link ConnectionFactoryBuilder} that works for ActiveMQ
+ *
+ *
+ */
+ public static class DefaultConnectionFactoryBuilder implements ConnectionFactoryBuilder
+ {
+ protected String connectionFactoryClass;
+
+ @NotNull
+ protected Map<String, String> connectionFactoryProperties = Maps.newHashMap();
+
+ /**
+ * Get properties used to configure this DefaultConnectionFactoryBuilder instance
+ *
+ * @return Map of properties
+ */
+ public Map<String, String> getConnectionFactoryProperties()
+ {
+ return connectionFactoryProperties;
+ }
+
+ /**
+ * Set properties used to configure this DefaultConnectionFactoryBuilder instance.
+ * Note: previous properties are overwritten.
+ *
+ * @param connectionFactoryProperties
+ */
+ public void setConnectionFactoryProperties(Map<String, String> connectionFactoryProperties)
+ {
+ this.connectionFactoryProperties = connectionFactoryProperties;
+ }
+
+ /**
+ * Get the fully qualified class-name of the connection factory that is used by this
+ * builder to instantiate the connection factory
+ *
+ * @return fully qualified class-name
+ */
+ public String getConnectionFactoryClass()
+ {
+ return connectionFactoryClass;
+ }
+
+ /**
+ * Set the fully qualified class-name of the connection factory that is used by this
+ * builder to instantiate the connection factory
+ *
+ * @param connectionFactoryClass fully qualified class-name
+ */
+ public void setConnectionFactoryClass(String connectionFactoryClass)
+ {
+ this.connectionFactoryClass = connectionFactoryClass;
+ }
+
+ @Override
+ public ConnectionFactory buildConnectionFactory()
+ {
+ ConnectionFactory cf;
+ try {
+ if (connectionFactoryClass != null) {
+ @SuppressWarnings("unchecked")
+ Class<ConnectionFactory> clazz = (Class<ConnectionFactory>)Class.forName(connectionFactoryClass);
+ cf = clazz.newInstance();
+ } else {
+ cf = new org.apache.activemq.ActiveMQConnectionFactory();
+ }
+ BeanUtils.populate(cf, connectionFactoryProperties);
+ logger.debug("creation successful.");
+ return cf;
+ } catch (Exception e) {
+ DTThrowable.rethrow(e);
+ return null; // previous rethrow makes this redundant, but compiler doesn't know...
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "DefaultConnectionFactoryBuilder [connectionFactoryProperties=" + connectionFactoryProperties + "]";
+ }
+ }
+
+ /**
* @return the connection
*/
public Connection getConnection()
@@ -111,15 +213,68 @@ public class JMSBase
{
return destination;
}
-
+
+ /**
+ * gets the connection factory class-name used by the default connection factory builder
+ *
+ * @return connection factory class-name
+ */
public String getConnectionFactoryClass()
{
- return connectionFactoryClass;
+ if (connectionFactoryBuilder == null) {
+ connectionFactoryBuilder = createDefaultConnectionFactoryBuilderIfRequired();
+ }
+ if (connectionFactoryBuilder instanceof DefaultConnectionFactoryBuilder) {
+ return ((DefaultConnectionFactoryBuilder)connectionFactoryBuilder).getConnectionFactoryClass();
+ } else {
+ throw new UnsupportedOperationException("ConnectionFactoryBuilder does not support connectionFactoryClass");
+ }
}
+ /**
+ * if the existing connectionFactoryBuilder is not of type DefaultConnectionFactoryBuilder
+ * create one.
+ *
+ * @return the current DefaultConnectionFactoryBuilder value
+ */
+ private DefaultConnectionFactoryBuilder createDefaultConnectionFactoryBuilderIfRequired()
+ {
+ if (!(connectionFactoryBuilder instanceof DefaultConnectionFactoryBuilder)) {
+ connectionFactoryBuilder = new DefaultConnectionFactoryBuilder();
+ }
+ return (DefaultConnectionFactoryBuilder)connectionFactoryBuilder;
+ }
+
+ /**
+ * Sets the connection factory class-name used by the default connection factory builder
+ *
+ * @param connectionFactoryClass factory class-name to be set
+ */
public void setConnectionFactoryClass(String connectionFactoryClass)
{
- this.connectionFactoryClass = connectionFactoryClass;
+ DefaultConnectionFactoryBuilder builder =
+ createDefaultConnectionFactoryBuilderIfRequired();
+ builder.setConnectionFactoryClass(connectionFactoryClass);
+ }
+
+ /**
+ * gets the connection factory builder of this instance
+ *
+ * @return connection factory builder
+ */
+ public ConnectionFactoryBuilder getConnectionFactoryBuilder()
+ {
+ return connectionFactoryBuilder;
+ }
+
+ /**
+ * Sets the connection factory builder of this instance
+ *
+ * @param connectionFactoryBuilder connection factory builder for this instance
+ */
+ public void setConnectionFactoryBuilder(ConnectionFactoryBuilder connectionFactoryBuilder)
+ {
+ this.connectionFactoryBuilder = connectionFactoryBuilder;
}
/**
@@ -129,12 +284,27 @@ public class JMSBase
*/
public Map<String, String> getConnectionFactoryProperties()
{
- return connectionFactoryProperties;
+ if (connectionFactoryBuilder == null) {
+ connectionFactoryBuilder = createDefaultConnectionFactoryBuilderIfRequired();
+ }
+ if (connectionFactoryBuilder instanceof DefaultConnectionFactoryBuilder) {
+ return ((DefaultConnectionFactoryBuilder)connectionFactoryBuilder).getConnectionFactoryProperties();
+ } else {
+ throw new UnsupportedOperationException("ConnectionFactoryBuilder does not support connectionFactoryProperties");
+ }
}
+ /**
+ * Sets the connection factory properties. Property names are provider specific and can be set directly from configuration, for example:<p>
+ * <code>dt.operator.JMSOper.connectionFactoryProperties.brokerURL=vm://localhost<code>
+ *
+ * @param connectionFactoryProperties reference to mutable properties
+ */
public void setConnectionFactoryProperties(Map<String, String> connectionFactoryProperties)
{
- this.connectionFactoryProperties = connectionFactoryProperties;
+ DefaultConnectionFactoryBuilder builder =
+ createDefaultConnectionFactoryBuilderIfRequired();
+ builder.setConnectionFactoryProperties(connectionFactoryProperties);
}
/**
@@ -143,7 +313,7 @@ public class JMSBase
@Deprecated
public void setUser(String user)
{
- this.connectionFactoryProperties.put("userName", user);
+ this.getConnectionFactoryProperties().put("userName", user);
}
/**
@@ -152,7 +322,7 @@ public class JMSBase
@Deprecated
public void setPassword(String password)
{
- this.connectionFactoryProperties.put("password", password);
+ this.getConnectionFactoryProperties().put("password", password);
}
/**
@@ -161,7 +331,7 @@ public class JMSBase
@Deprecated
public void setUrl(String url)
{
- this.connectionFactoryProperties.put("brokerURL", url);
+ this.getConnectionFactoryProperties().put("brokerURL", url);
}
/**
@@ -355,22 +525,10 @@ public class JMSBase
*/
protected ConnectionFactory getConnectionFactory()
{
- logger.debug("class {} properties {}", connectionFactoryClass, connectionFactoryProperties);
- ConnectionFactory cf;
- try {
- if (connectionFactoryClass != null) {
- @SuppressWarnings("unchecked")
- Class<ConnectionFactory> clazz = (Class<ConnectionFactory>)Class.forName(connectionFactoryClass);
- cf = clazz.newInstance();
- } else {
- cf = new org.apache.activemq.ActiveMQConnectionFactory();
- }
- BeanUtils.populate(cf, connectionFactoryProperties);
- logger.debug("creation successful.");
- return cf;
- } catch (Exception e) {
- throw new RuntimeException("Failed to create connection factory.", e);
- }
+ logger.debug("connectionFactoryBuilder {}", "" + connectionFactoryBuilder);
+
+ return connectionFactoryBuilder.buildConnectionFactory();
+
}
/**
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/test/java/com/datatorrent/lib/io/jms/JMSObjectInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/jms/JMSObjectInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/jms/JMSObjectInputOperatorTest.java
index 06e94c6..e4967ca 100644
--- a/library/src/test/java/com/datatorrent/lib/io/jms/JMSObjectInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/jms/JMSObjectInputOperatorTest.java
@@ -81,6 +81,7 @@ public class JMSObjectInputOperatorTest
context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap);
operator = new JMSObjectInputOperator();
+ operator.setSubject("TEST.FOO");
operator.getConnectionFactoryProperties().put(JMSTestBase.AMQ_BROKER_URL, "vm://localhost");
sink = new CollectorTestSink<Object>();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java
index 42f730c..82f1c67 100644
--- a/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java
@@ -77,6 +77,7 @@ public class JMSStringInputOperatorTest
context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap);
operator = new JMSStringInputOperator();
+ operator.setSubject("TEST.FOO");
operator.getConnectionFactoryProperties().put(JMSTestBase.AMQ_BROKER_URL, "vm://localhost");
sink = new CollectorTestSink<>();
@@ -145,6 +146,7 @@ public class JMSStringInputOperatorTest
throw new RuntimeException("fail ack");
}
};
+ testMeta.operator.setSubject("TEST.FOO");
testMeta.operator.getConnectionFactoryProperties().put(JMSTestBase.AMQ_BROKER_URL, "vm://localhost");
testMeta.operator.setup(testMeta.context);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/test/java/com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.java
new file mode 100644
index 0000000..53e787d
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.jms;
+
+import java.io.File;
+
+import javax.jms.ConnectionFactory;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.internal.AssumptionViolatedException;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+
+import com.amazon.sqs.javamessaging.SQSConnectionFactory;
+import com.amazonaws.auth.PropertiesFileCredentialsProvider;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * Tests for {@link JMSStringInputOperator} for AMZ SQS.
+ * Note: for SQS we should use AckMode as "AUTO_ACKNOWLEDGE" and
+ * no transacted mode (transacted = false).
+ *
+ * Note: check the comment for com.amazon.sqs.javamessaging.SQSMessageConsumer.close()
+ * specifically: "Since consumer prefetch threads use SQS long-poll feature with 20 seconds
+ * timeout, closing each consumer prefetch thread can take up to 20 seconds,
+ * which in-turn will impact the time on consumer close."
+ *
+ * Because of the above this test takes a long time due to consumer.close() in
+ * com.datatorrent.lib.io.jms.AbstractJMSInputOperator.cleanup()
+ *
+ * NOTE: tests are automatically skipped if the secret key in sqstestCreds.properties
+ * is missing or blank.
+ *
+ * NOTE: each test creates its own uniquely named queue in SQS and then deletes it afterwards.
+ * Also we try to scrub any leftover queues from the previous runs just in case tests were
+ * aborted (check com.datatorrent.lib.io.jms.SQSTestBase.generateCurrentQueueName(String))
+ *
+ */
+public class SQSStringInputOperatorTest
+{
+ public static class TestMeta extends TestWatcher
+ {
+ String baseDir;
+ JMSStringInputOperator operator;
+ CollectorTestSink<Object> sink;
+ Context.OperatorContext context;
+ SQSTestBase testBase;
+
+ @Override
+ protected void starting(Description description)
+ {
+ final String methodName = description.getMethodName();
+ final String className = description.getClassName();
+
+ testBase = new SQSTestBase();
+ if (testBase.validateTestCreds() == false) {
+ return;
+ }
+ testBase.generateCurrentQueueName(methodName);
+ try {
+ testBase.beforTest();
+ } catch (AssumptionViolatedException ave) {
+ throw ave;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ baseDir = "target/" + className + "/" + methodName;
+
+ Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500);
+ attributeMap.put(Context.DAGContext.APPLICATION_PATH, baseDir);
+
+ context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap);
+ operator = new JMSStringInputOperator();
+ operator.setConnectionFactoryBuilder(new JMSBase.ConnectionFactoryBuilder()
+ {
+
+ @Override
+ public ConnectionFactory buildConnectionFactory()
+ {
+ // Create the connection factory using the environment variable credential provider.
+ // Connections this factory creates can talk to the queues in us-east-1 region.
+ SQSConnectionFactory connectionFactory =
+ SQSConnectionFactory.builder()
+ .withRegion(Region.getRegion(Regions.US_EAST_1))
+ .withAWSCredentialsProvider(new PropertiesFileCredentialsProvider(testBase.getDevCredsFilePath()))
+ .build();
+ return connectionFactory;
+ }
+
+ @Override
+ public String toString()
+ {
+ return className + "/" + methodName + "/ConnectionFactoryBuilder";
+ }
+
+ });
+ operator.setSubject(testBase.getCurrentQueueName());
+ // for SQS ack mode should be "AUTO_ACKNOWLEDGE" and transacted = false
+ operator.setAckMode("AUTO_ACKNOWLEDGE");
+ operator.setTransacted(false);
+
+ sink = new CollectorTestSink<>();
+ operator.output.setSink(sink);
+ operator.setup(context);
+ operator.activate(context);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ if (operator == null) {
+ Assert.assertFalse(testBase.validateTestCreds());
+ return;
+ }
+ operator.deactivate();
+ operator.teardown();
+ try {
+ FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
+ testBase.afterTest();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+
+ /**
+ * Basic string input test
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testStringMsgInput() throws Exception
+ {
+ testMeta.testBase.validateAssumption();
+ testMeta.testBase.produceMsg("testStringMsgInput", 10, false);
+ Thread.sleep(1000);
+ testMeta.operator.emitTuples();
+ Assert.assertEquals("num of messages", 10, testMeta.sink.collectedTuples.size());
+ }
+
+
+ @Test
+ public void testRecoveryAndIdempotency() throws Exception
+ {
+ testMeta.testBase.validateAssumption();
+ testMeta.testBase.produceUniqueMsgs("testRecoveryAndIdempotency", 25, false);
+ Thread.sleep(3000);
+ testMeta.operator.beginWindow(1);
+ testMeta.operator.emitTuples();
+ testMeta.operator.endWindow();
+
+ Assert.assertEquals("num of messages in window 1 pre-failure", 25, testMeta.sink.collectedTuples.size());
+
+ // for some reason AMZ SQS doesn't preserve the order producer->consumer or we might have a race
+ // condition on our producer side, so we can't be sure that get(4) will return "4:..."
+ // In any case we will only check message matching between pre-failure
+ // and post-failure cases for 4th and 17th message
+ final String message4 = (String)testMeta.sink.collectedTuples.get(4);
+ final String message17 = (String)testMeta.sink.collectedTuples.get(17);
+
+ //failure and then re-deployment of operator
+ testMeta.sink.collectedTuples.clear();
+ testMeta.operator.setup(testMeta.context);
+ testMeta.operator.activate(testMeta.context);
+
+ Assert.assertEquals("largest recovery window", 1,
+ testMeta.operator.getWindowDataManager().getLargestRecoveryWindow());
+
+ testMeta.operator.beginWindow(1);
+ testMeta.operator.endWindow();
+ Assert.assertEquals("num of messages in window 1", 25, testMeta.sink.collectedTuples.size());
+ Assert.assertEquals(message4, testMeta.sink.collectedTuples.get(4));
+ Assert.assertEquals(message17, testMeta.sink.collectedTuples.get(17));
+ testMeta.sink.collectedTuples.clear();
+ }
+
+
+ /**
+ * This test is different from the one in JMSStringInputOperatorTest because of the differences
+ * in acknowledge mode. There is no Ack failure but rather failure in emit Tuple. But endWindow
+ * eventually drains the holdingBuffer and emits all the outstanding tuples so we see 9 tuples
+ * eventually. Because of the async nature of the operator (messages are delivered to the operator
+ * as an async listener) we can't be sure how many messages are present in testMeta.sink.collectedTuples
+ * before endWindow so our assertion is just testMeta.sink.collectedTuples.size() < 9
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testFailureAfterPersistenceAndBeforeRecovery() throws Exception
+ {
+ testMeta.testBase.validateAssumption();
+ testMeta.sink = new CollectorTestSink<Object>()
+ {
+ @Override
+ public void put(Object payload)
+ {
+ if (payload instanceof String && ((String)payload).startsWith("4:")) {
+ throw new RuntimeException("fail 4th message");
+ }
+ synchronized (collectedTuples) {
+ collectedTuples.add(payload);
+ collectedTuples.notifyAll();
+ }
+ }
+ };
+ testMeta.operator.output.setSink(testMeta.sink);
+
+ testMeta.testBase.produceUniqueMsgs("testFailureAfterPersistenceAndBeforeRecovery", 10, false);
+ Thread.sleep(1000);
+ testMeta.operator.beginWindow(1);
+ try {
+ testMeta.operator.emitTuples();
+ } catch (Throwable t) {
+ LOG.debug("emit exception");
+ }
+ Assert.assertTrue("num of messages before endWindow 1", testMeta.sink.collectedTuples.size() < 9);
+ testMeta.operator.endWindow();
+ Assert.assertEquals("num of messages after endWindow 1", 9, testMeta.sink.collectedTuples.size());
+
+ testMeta.operator.setup(testMeta.context);
+ testMeta.operator.activate(testMeta.context);
+
+ Assert.assertEquals("window 1 should exist", 1,
+ testMeta.operator.getWindowDataManager().getLargestRecoveryWindow());
+ }
+
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(SQSStringInputOperatorTest.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/test/java/com/datatorrent/lib/io/jms/SQSTestBase.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/jms/SQSTestBase.java b/library/src/test/java/com/datatorrent/lib/io/jms/SQSTestBase.java
new file mode 100644
index 0000000..4dc63c3
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/jms/SQSTestBase.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.jms;
+
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+
+import com.amazonaws.auth.PropertiesCredentials;
+import com.amazonaws.auth.PropertiesFileCredentialsProvider;
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.CreateQueueRequest;
+import com.amazonaws.services.sqs.model.CreateQueueResult;
+import com.amazonaws.services.sqs.model.ListQueuesResult;
+import com.amazonaws.services.sqs.model.PurgeQueueRequest;
+
+/**
+ * Base class for SQS tests. <br/>
+ * Various SQS (AWS) related helper functions
+ */
+public class SQSTestBase
+{
+ public static final String TEST_FOO = "TEST_FOO";
+
+ public PropertiesCredentials testCreds;
+
+ /**
+ * creds to be used by the dev end eg. by the JMSInputOperator in SQS mode
+ */
+ private static String SQSDEV_CREDS_FILENAME = "/sqsdevCreds.properties";
+
+ /**
+ * creds to be used by the test end eg. by SQSInputOperatorTest
+ */
+ private static String SQSTEST_CREDS_FILENAME = "/sqstestCreds.properties";
+
+ private AmazonSQSClient sqs;
+
+ private String currentQueueName;
+ private String currentQueueUrl;
+
+
+ public SQSTestBase()
+ {
+ PropertiesFileCredentialsProvider file = new PropertiesFileCredentialsProvider(getTestCredsFilePath());
+ testCreds = (PropertiesCredentials)file.getCredentials();
+ sqs = new AmazonSQSClient(testCreds);
+ }
+
+ public String getTestCredsFilePath()
+ {
+ return getClass().getResource(SQSTEST_CREDS_FILENAME).getFile();
+ }
+
+ public String getDevCredsFilePath()
+ {
+ return getClass().getResource(SQSDEV_CREDS_FILENAME).getFile();
+ }
+
+ public String getCurrentQueueName()
+ {
+ return currentQueueName;
+ }
+
+ public void setCurrentQueueName(String currentQueueName)
+ {
+ this.currentQueueName = currentQueueName;
+ }
+
+ /**
+ * Each test creates its own uniquely named queue in SQS and then deletes it afterwards.
+ * We try to scrub any leftover queues from the previous runs just in case tests were
+ * aborted
+ *
+ * @param currentQueueNamePrefix
+ */
+ public void generateCurrentQueueName(String currentQueueNamePrefix)
+ {
+ if (validateTestCreds()) {
+ ListQueuesResult list = sqs.listQueues(currentQueueNamePrefix);
+ for (String url : list.getQueueUrls()) {
+ sqs.deleteQueue(url);
+ }
+ }
+ this.currentQueueName = currentQueueNamePrefix + System.currentTimeMillis();
+ }
+
+ public void produceMsg(String[] msgs, boolean purgeFirst) throws Exception
+ {
+ CreateQueueResult res = sqs.createQueue(getCurrentQueueName());
+ if (purgeFirst) {
+ PurgeQueueRequest purgeReq = new PurgeQueueRequest(res.getQueueUrl());
+ sqs.purgeQueue(purgeReq);
+ }
+ for (String text : msgs) {
+ sqs.sendMessage(res.getQueueUrl(), text);
+ }
+ }
+
+ /**
+ *
+ * @param text
+ * @throws Exception
+ */
+ public void produceMsg(String text, boolean purgeFirst) throws Exception
+ {
+ produceMsg(new String[] {text}, purgeFirst);
+ }
+
+ /**
+ * TODO: copy the logic of JMSTestBase.produceMsg
+ *
+ * @param text
+ * @throws Exception
+ */
+ public void produceMsg(String text, int num, boolean purgeFirst) throws Exception
+ {
+ String[] array = new String[num];
+ for (int i = 0; i < num; i++) {
+ array[i] = text;
+ }
+ produceMsg(array, purgeFirst);
+ }
+
+ /**
+ * Produce unique messages
+ *
+ * @param text
+ * @throws Exception
+ */
+ public void produceUniqueMsgs(String text, int num, boolean purgeFirst) throws Exception
+ {
+ String[] array = new String[num];
+ for (int i = 0; i < num; i++) {
+ array[i] = "" + i + ":" + text;
+ }
+ produceMsg(array, purgeFirst);
+ }
+
+ public boolean validateTestCreds()
+ {
+ return testCreds.getAWSSecretKey() != null &&
+ testCreds.getAWSSecretKey().trim().isEmpty() == false;
+ }
+
+ public void validateAssumption()
+ {
+ Assume.assumeTrue(validateTestCreds());
+ }
+
+
+ /**
+ * create a queue we can use for testing
+ *
+ * @throws Exception
+ */
+ @Before
+ public void beforTest() throws Exception
+ {
+ validateAssumption();
+ // Create a queue
+ CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(getCurrentQueueName());
+ currentQueueUrl = sqs.createQueue(createQueueRequest).getQueueUrl();
+ }
+
+ @After
+ public void afterTest() throws Exception
+ {
+ sqs.deleteQueue(currentQueueUrl);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/test/resources/sqsdevCreds.properties
----------------------------------------------------------------------
diff --git a/library/src/test/resources/sqsdevCreds.properties b/library/src/test/resources/sqsdevCreds.properties
new file mode 100644
index 0000000..3ce01e4
--- /dev/null
+++ b/library/src/test/resources/sqsdevCreds.properties
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+accessKey=
+secretKey=
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/test/resources/sqstestCreds.properties
----------------------------------------------------------------------
diff --git a/library/src/test/resources/sqstestCreds.properties b/library/src/test/resources/sqstestCreds.properties
new file mode 100644
index 0000000..3ce01e4
--- /dev/null
+++ b/library/src/test/resources/sqstestCreds.properties
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+accessKey=
+secretKey=
[2/2] apex-malhar git commit: Merge branch
'APEXMALHAR-2156.improvement2' of github.com:sanjaypujare/apex-malhar
Posted by pr...@apache.org.
Merge branch 'APEXMALHAR-2156.improvement2' of github.com:sanjaypujare/apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/37cb5848
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/37cb5848
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/37cb5848
Branch: refs/heads/master
Commit: 37cb584841810dbc2ac099b6d0ba30cdea36f1d1
Parents: 5972bca d46a6be
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Tue Aug 9 16:42:07 2016 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Tue Aug 9 16:42:07 2016 -0700
----------------------------------------------------------------------
library/pom.xml | 34 ++-
.../lib/io/jms/AbstractJMSInputOperator.java | 10 +
.../com/datatorrent/lib/io/jms/JMSBase.java | 214 +++++++++++++--
.../lib/io/jms/JMSObjectInputOperatorTest.java | 1 +
.../lib/io/jms/JMSStringInputOperatorTest.java | 2 +
.../lib/io/jms/SQSStringInputOperatorTest.java | 262 +++++++++++++++++++
.../com/datatorrent/lib/io/jms/SQSTestBase.java | 187 +++++++++++++
.../src/test/resources/sqsdevCreds.properties | 21 ++
.../src/test/resources/sqstestCreds.properties | 21 ++
9 files changed, 719 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/37cb5848/library/pom.xml
----------------------------------------------------------------------
diff --cc library/pom.xml
index 026bbae,8fe30d0..b8c6271
--- a/library/pom.xml
+++ b/library/pom.xml
@@@ -343,11 -349,30 +349,29 @@@
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.10.73</version>
- <scope>test</scope>
</dependency>
<dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>2.5.4</version>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-sqs</artifactId>
+ <version>1.10.73</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.5.4</version>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>amazon-sqs-java-messaging-lib</artifactId>
+ <version>1.0.0</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-sqs</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>