You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2016/08/09 23:58:13 UTC

[1/2] apex-malhar git commit: JMS Input operator changes to support SQS and ActiveMQ Incorporate comments: javadocs, NotNull constraint for the builder and create function to return the builder ref

Repository: apex-malhar
Updated Branches:
  refs/heads/master 5972bca41 -> 37cb58484


JMS Input operator changes to support SQS and ActiveMQ
Incorporate comments: javadocs, NotNull constraint for the builder and create function to return the builder ref


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/d46a6be6
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/d46a6be6
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/d46a6be6

Branch: refs/heads/master
Commit: d46a6be62d2b70bb99ab7ca02d0a87ea2d5eba38
Parents: 7dea3d0
Author: Sanjay Pujare <sa...@Sanjay-DT-MacBook-Pro.local>
Authored: Mon Aug 8 12:40:05 2016 -0700
Committer: Sanjay Pujare <sa...@Sanjay-DT-MacBook-Pro.local>
Committed: Tue Aug 9 16:38:13 2016 -0700

----------------------------------------------------------------------
 library/pom.xml                                 |  34 ++-
 .../lib/io/jms/AbstractJMSInputOperator.java    |  10 +
 .../com/datatorrent/lib/io/jms/JMSBase.java     | 214 +++++++++++++--
 .../lib/io/jms/JMSObjectInputOperatorTest.java  |   1 +
 .../lib/io/jms/JMSStringInputOperatorTest.java  |   2 +
 .../lib/io/jms/SQSStringInputOperatorTest.java  | 262 +++++++++++++++++++
 .../com/datatorrent/lib/io/jms/SQSTestBase.java | 187 +++++++++++++
 .../src/test/resources/sqsdevCreds.properties   |  21 ++
 .../src/test/resources/sqstestCreds.properties  |  21 ++
 9 files changed, 719 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/pom.xml
----------------------------------------------------------------------
diff --git a/library/pom.xml b/library/pom.xml
index 8d264a4..8fe30d0 100644
--- a/library/pom.xml
+++ b/library/pom.xml
@@ -306,12 +306,18 @@
       <artifactId>janino</artifactId>
       <version>2.7.8</version>
       <scope>test</scope>
-    </dependency>    
-     <dependency>
+    </dependency>
+    <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
       <version>1.8.5</version>
       <scope>test</scope>
+      <exclusions>
+        <exclusion>
+           <groupId>org.hamcrest</groupId>
+           <artifactId>hamcrest-core</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.eclipse.jetty</groupId>
@@ -346,9 +352,27 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-	  <groupId>com.fasterxml.jackson.core</groupId>
-	  <artifactId>jackson-databind</artifactId>
-	  <version>2.5.4</version>
+      <groupId>com.amazonaws</groupId>
+      <artifactId>aws-java-sdk-sqs</artifactId>
+      <version>1.10.73</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>2.5.4</version>
+    </dependency>
+    <dependency>	  
+      <groupId>com.amazonaws</groupId>
+      <artifactId>amazon-sqs-java-messaging-lib</artifactId>
+      <version>1.0.0</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>com.amazonaws</groupId>
+          <artifactId>aws-java-sdk-sqs</artifactId>
+        </exclusion>
+      </exclusions> 
     </dependency>
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
index cc27c88..bf0fe5c 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java
@@ -514,6 +514,16 @@ public abstract class AbstractJMSInputOperator<T> extends JMSBase
     return this.windowDataManager;
   }
 
+  /**
+   * Sets this transacted value
+   *
+   * @param value    new value for transacted
+   */
+  public void setTransacted(boolean value)
+  {
+    transacted = value;
+  }
+
   protected abstract void emit(T payload);
 
   public static enum CounterKeys

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java b/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java
index 48ed2c3..772464a 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java
@@ -25,6 +25,7 @@ import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Session;
+import javax.validation.constraints.NotNull;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,6 +34,8 @@ import org.apache.commons.beanutils.BeanUtils;
 
 import com.google.common.collect.Maps;
 
+import com.datatorrent.netlet.util.DTThrowable;
+
 /**
  * Base class for any JMS input or output adapter operator.
  * <p/>
@@ -76,11 +79,11 @@ public class JMSBase
   private transient Session session;
   private transient Destination destination;
 
-  private String connectionFactoryClass;
-  private Map<String, String> connectionFactoryProperties = Maps.newHashMap();
+  @NotNull
+  private ConnectionFactoryBuilder connectionFactoryBuilder;
   private String ackMode = "CLIENT_ACKNOWLEDGE";
-  private String clientId = "TestClient";
-  private String subject = "TEST.FOO";
+  private String clientId;
+  private String subject;
   private int batch = 10;
   private int messageSize = 255;
   private boolean durable = false;
@@ -89,6 +92,105 @@ public class JMSBase
   protected boolean transacted = true;
 
   /**
+   * Builder class that allows caller to build the connection factory (optional)
+   *
+   */
+  public static interface ConnectionFactoryBuilder
+  {
+
+    /**
+     * This method is called by the operator to return properly built
+     * (authenticated, connected etc) connection factory
+     *
+     * @return properly built connection factory
+     */
+    public ConnectionFactory buildConnectionFactory();
+  }
+
+  /**
+   * Default implementation for {@link ConnectionFactoryBuilder} that works for ActiveMQ
+   *
+   *
+   */
+  public static class DefaultConnectionFactoryBuilder implements ConnectionFactoryBuilder
+  {
+    protected String connectionFactoryClass;
+
+    @NotNull
+    protected Map<String, String> connectionFactoryProperties = Maps.newHashMap();
+
+    /**
+     * Get properties used to configure this DefaultConnectionFactoryBuilder instance
+     *
+     * @return Map of properties
+     */
+    public Map<String, String> getConnectionFactoryProperties()
+    {
+      return connectionFactoryProperties;
+    }
+
+    /**
+     * Set properties used to configure this DefaultConnectionFactoryBuilder instance.
+     * Note: previous properties are overwritten.
+     *
+     * @param connectionFactoryProperties
+     */
+    public void setConnectionFactoryProperties(Map<String, String> connectionFactoryProperties)
+    {
+      this.connectionFactoryProperties = connectionFactoryProperties;
+    }
+    
+    /**
+     * Get the fully qualified class-name of the connection factory that is used by this
+     * builder to instantiate the connection factory
+     *
+     * @return fully qualified class-name
+     */
+    public String getConnectionFactoryClass()
+    {
+      return connectionFactoryClass;
+    }
+    
+    /**
+     * Set the fully qualified class-name of the connection factory that is used by this
+     * builder to instantiate the connection factory
+     *
+     * @param connectionFactoryClass  fully qualified class-name
+     */
+    public void setConnectionFactoryClass(String connectionFactoryClass)
+    {
+      this.connectionFactoryClass = connectionFactoryClass;
+    }
+
+    @Override
+    public ConnectionFactory buildConnectionFactory()
+    {
+      ConnectionFactory cf;
+      try {
+        if (connectionFactoryClass != null) {
+          @SuppressWarnings("unchecked")
+          Class<ConnectionFactory> clazz = (Class<ConnectionFactory>)Class.forName(connectionFactoryClass);
+          cf = clazz.newInstance();
+        } else {
+          cf = new org.apache.activemq.ActiveMQConnectionFactory();
+        }
+        BeanUtils.populate(cf, connectionFactoryProperties);
+        logger.debug("creation successful.");
+        return cf;
+      } catch (Exception e) {
+        DTThrowable.rethrow(e);
+        return null;  // previous rethrow makes this redundant, but compiler doesn't know...
+      }
+    }
+
+    @Override
+    public String toString()
+    {
+      return "DefaultConnectionFactoryBuilder [connectionFactoryProperties=" + connectionFactoryProperties + "]";
+    }
+  }
+
+  /**
    * @return the connection
    */
   public Connection getConnection()
@@ -111,15 +213,68 @@ public class JMSBase
   {
     return destination;
   }
-
+  
+  /**
+   * gets the connection factory class-name used by the default connection factory builder
+   *
+   * @return connection factory class-name
+   */
   public String getConnectionFactoryClass()
   {
-    return connectionFactoryClass;
+    if (connectionFactoryBuilder == null) {
+      connectionFactoryBuilder = createDefaultConnectionFactoryBuilderIfRequired();
+    }
+    if (connectionFactoryBuilder instanceof DefaultConnectionFactoryBuilder) {
+      return ((DefaultConnectionFactoryBuilder)connectionFactoryBuilder).getConnectionFactoryClass();
+    } else {
+      throw new UnsupportedOperationException("ConnectionFactoryBuilder does not support connectionFactoryClass");
+    }
   }
 
+  /**
+   * if the existing connectionFactoryBuilder is not of type DefaultConnectionFactoryBuilder
+   * create one.
+   *
+   * @return the current DefaultConnectionFactoryBuilder value
+   */
+  private DefaultConnectionFactoryBuilder createDefaultConnectionFactoryBuilderIfRequired()
+  {
+    if (!(connectionFactoryBuilder instanceof DefaultConnectionFactoryBuilder)) {
+      connectionFactoryBuilder = new DefaultConnectionFactoryBuilder();
+    }
+    return (DefaultConnectionFactoryBuilder)connectionFactoryBuilder;
+  }
+  
+  /**
+   * Sets the connection factory class-name used by the default connection factory builder
+   *
+   * @param connectionFactoryClass factory class-name to be set
+   */
   public void setConnectionFactoryClass(String connectionFactoryClass)
   {
-    this.connectionFactoryClass = connectionFactoryClass;
+    DefaultConnectionFactoryBuilder builder =
+        createDefaultConnectionFactoryBuilderIfRequired();
+    builder.setConnectionFactoryClass(connectionFactoryClass);
+  }
+
+  /**
+   * gets the connection factory builder of this instance
+   *
+   * @return connection factory builder
+   */
+  public ConnectionFactoryBuilder getConnectionFactoryBuilder()
+  {
+    return connectionFactoryBuilder;
+  }
+
+  /**
+   * Sets the connection factory builder of this instance
+   *
+   * @param connectionFactoryBuilder connection factory builder for this instance
+   */
+  public void setConnectionFactoryBuilder(ConnectionFactoryBuilder connectionFactoryBuilder)
+  {
+    this.connectionFactoryBuilder = connectionFactoryBuilder;
   }
 
   /**
@@ -129,12 +284,27 @@ public class JMSBase
    */
   public Map<String, String> getConnectionFactoryProperties()
   {
-    return connectionFactoryProperties;
+    if (connectionFactoryBuilder == null) {
+      connectionFactoryBuilder = createDefaultConnectionFactoryBuilderIfRequired();
+    }
+    if (connectionFactoryBuilder instanceof DefaultConnectionFactoryBuilder) {
+      return ((DefaultConnectionFactoryBuilder)connectionFactoryBuilder).getConnectionFactoryProperties();
+    } else {
+      throw new UnsupportedOperationException("ConnectionFactoryBuilder does not support connectionFactoryProperties");
+    }
   }
 
+  /**
+   * Sets the connection factory properties. Property names are provider specific and can be set directly from configuration, for example:<p>
+   * <code>dt.operator.JMSOper.connectionFactoryProperties.brokerURL=vm://localhost<code>
+   *
+   * @param connectionFactoryProperties reference to mutable properties
+   */
   public void setConnectionFactoryProperties(Map<String, String> connectionFactoryProperties)
   {
-    this.connectionFactoryProperties = connectionFactoryProperties;
+    DefaultConnectionFactoryBuilder builder =
+        createDefaultConnectionFactoryBuilderIfRequired();
+    builder.setConnectionFactoryProperties(connectionFactoryProperties);
   }
 
   /**
@@ -143,7 +313,7 @@ public class JMSBase
   @Deprecated
   public void setUser(String user)
   {
-    this.connectionFactoryProperties.put("userName", user);
+    this.getConnectionFactoryProperties().put("userName", user);
   }
 
   /**
@@ -152,7 +322,7 @@ public class JMSBase
   @Deprecated
   public void setPassword(String password)
   {
-    this.connectionFactoryProperties.put("password", password);
+    this.getConnectionFactoryProperties().put("password", password);
   }
 
   /**
@@ -161,7 +331,7 @@ public class JMSBase
   @Deprecated
   public void setUrl(String url)
   {
-    this.connectionFactoryProperties.put("brokerURL", url);
+    this.getConnectionFactoryProperties().put("brokerURL", url);
   }
 
   /**
@@ -355,22 +525,10 @@ public class JMSBase
    */
   protected ConnectionFactory getConnectionFactory()
   {
-    logger.debug("class {} properties {}", connectionFactoryClass, connectionFactoryProperties);
-    ConnectionFactory cf;
-    try {
-      if (connectionFactoryClass != null) {
-        @SuppressWarnings("unchecked")
-        Class<ConnectionFactory> clazz = (Class<ConnectionFactory>)Class.forName(connectionFactoryClass);
-        cf = clazz.newInstance();
-      } else {
-        cf = new org.apache.activemq.ActiveMQConnectionFactory();
-      }
-      BeanUtils.populate(cf, connectionFactoryProperties);
-      logger.debug("creation successful.");
-      return cf;
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to create connection factory.", e);
-    }
+    logger.debug("connectionFactoryBuilder {}", "" + connectionFactoryBuilder);
+
+    return connectionFactoryBuilder.buildConnectionFactory();
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/test/java/com/datatorrent/lib/io/jms/JMSObjectInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/jms/JMSObjectInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/jms/JMSObjectInputOperatorTest.java
index 06e94c6..e4967ca 100644
--- a/library/src/test/java/com/datatorrent/lib/io/jms/JMSObjectInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/jms/JMSObjectInputOperatorTest.java
@@ -81,6 +81,7 @@ public class JMSObjectInputOperatorTest
 
       context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap);
       operator = new JMSObjectInputOperator();
+      operator.setSubject("TEST.FOO");
       operator.getConnectionFactoryProperties().put(JMSTestBase.AMQ_BROKER_URL, "vm://localhost");
 
       sink = new CollectorTestSink<Object>();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java
index 42f730c..82f1c67 100644
--- a/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java
@@ -77,6 +77,7 @@ public class JMSStringInputOperatorTest
 
       context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap);
       operator = new JMSStringInputOperator();
+      operator.setSubject("TEST.FOO");
       operator.getConnectionFactoryProperties().put(JMSTestBase.AMQ_BROKER_URL, "vm://localhost");
 
       sink = new CollectorTestSink<>();
@@ -145,6 +146,7 @@ public class JMSStringInputOperatorTest
         throw new RuntimeException("fail ack");
       }
     };
+    testMeta.operator.setSubject("TEST.FOO");
     testMeta.operator.getConnectionFactoryProperties().put(JMSTestBase.AMQ_BROKER_URL, "vm://localhost");
 
     testMeta.operator.setup(testMeta.context);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/test/java/com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.java
new file mode 100644
index 0000000..53e787d
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/jms/SQSStringInputOperatorTest.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.jms;
+
+import java.io.File;
+
+import javax.jms.ConnectionFactory;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.internal.AssumptionViolatedException;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+
+import com.amazon.sqs.javamessaging.SQSConnectionFactory;
+import com.amazonaws.auth.PropertiesFileCredentialsProvider;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * Tests for {@link JMSStringInputOperator} for AMZ SQS.
+ * Note: for SQS we should use AckMode as "AUTO_ACKNOWLEDGE" and
+ * no transacted mode (transacted = false).
+ *
+ * Note: check the comment for com.amazon.sqs.javamessaging.SQSMessageConsumer.close()
+ * specifically: "Since consumer prefetch threads use SQS long-poll feature with 20 seconds
+     * timeout, closing each consumer prefetch thread can take up to 20 seconds,
+     * which in-turn will impact the time on consumer close."
+ *
+ * Because of the above this test takes a long time due to consumer.close() in
+ * com.datatorrent.lib.io.jms.AbstractJMSInputOperator.cleanup()
+ *
+ * NOTE: tests are automatically skipped if the secret key in sqstestCreds.properties
+ * is missing or blank.
+ *
+ * NOTE: each test creates its own uniquely named queue in SQS and then deletes it afterwards.
+ * Also we try to scrub any leftover queues from the previous runs just in case tests were
+ * aborted (check com.datatorrent.lib.io.jms.SQSTestBase.generateCurrentQueueName(String))
+ *
+ */
+public class SQSStringInputOperatorTest
+{
+  public static class TestMeta extends TestWatcher
+  {
+    String baseDir;
+    JMSStringInputOperator operator;
+    CollectorTestSink<Object> sink;
+    Context.OperatorContext context;
+    SQSTestBase testBase;
+
+    @Override
+    protected void starting(Description description)
+    {
+      final String methodName = description.getMethodName();
+      final String className = description.getClassName();
+
+      testBase = new SQSTestBase();
+      if (testBase.validateTestCreds() == false) {
+        return;
+      }
+      testBase.generateCurrentQueueName(methodName);
+      try {
+        testBase.beforTest();
+      } catch (AssumptionViolatedException ave) {
+        throw ave;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+
+      baseDir = "target/" + className + "/" + methodName;
+
+      Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+      attributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500);
+      attributeMap.put(Context.DAGContext.APPLICATION_PATH, baseDir);
+
+      context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap);
+      operator = new JMSStringInputOperator();
+      operator.setConnectionFactoryBuilder(new JMSBase.ConnectionFactoryBuilder()
+      {
+
+        @Override
+        public ConnectionFactory buildConnectionFactory()
+        {
+          // Create the connection factory using the environment variable credential provider.
+          // Connections this factory creates can talk to the queues in us-east-1 region.
+          SQSConnectionFactory connectionFactory =
+              SQSConnectionFactory.builder()
+              .withRegion(Region.getRegion(Regions.US_EAST_1))
+              .withAWSCredentialsProvider(new PropertiesFileCredentialsProvider(testBase.getDevCredsFilePath()))
+              .build();
+          return connectionFactory;
+        }
+
+        @Override
+        public String toString()
+        {
+          return className + "/" + methodName + "/ConnectionFactoryBuilder";
+        }
+
+      });
+      operator.setSubject(testBase.getCurrentQueueName());
+      // for SQS ack mode should be "AUTO_ACKNOWLEDGE" and transacted = false
+      operator.setAckMode("AUTO_ACKNOWLEDGE");
+      operator.setTransacted(false);
+
+      sink = new CollectorTestSink<>();
+      operator.output.setSink(sink);
+      operator.setup(context);
+      operator.activate(context);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      if (operator == null) {
+        Assert.assertFalse(testBase.validateTestCreds());
+        return;
+      }
+      operator.deactivate();
+      operator.teardown();
+      try {
+        FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
+        testBase.afterTest();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+
+  /**
+   * Basic string input test
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testStringMsgInput() throws Exception
+  {
+    testMeta.testBase.validateAssumption();
+    testMeta.testBase.produceMsg("testStringMsgInput", 10, false);
+    Thread.sleep(1000);
+    testMeta.operator.emitTuples();
+    Assert.assertEquals("num of messages", 10, testMeta.sink.collectedTuples.size());
+  }
+
+
+  @Test
+  public void testRecoveryAndIdempotency() throws Exception
+  {
+    testMeta.testBase.validateAssumption();
+    testMeta.testBase.produceUniqueMsgs("testRecoveryAndIdempotency", 25, false);
+    Thread.sleep(3000);
+    testMeta.operator.beginWindow(1);
+    testMeta.operator.emitTuples();
+    testMeta.operator.endWindow();
+
+    Assert.assertEquals("num of messages in window 1 pre-failure", 25, testMeta.sink.collectedTuples.size());
+
+    // for some reason AMZ SQS doesn't preserve the order producer->consumer or we might have a race
+    //    condition on our producer side, so we can't be sure that get(4) will return "4:..."
+    //    In any case we will only check message matching between pre-failure
+    //    and post-failure cases for 4th and 17th message
+    final String message4 = (String)testMeta.sink.collectedTuples.get(4);
+    final String message17 = (String)testMeta.sink.collectedTuples.get(17);
+
+    //failure and then re-deployment of operator
+    testMeta.sink.collectedTuples.clear();
+    testMeta.operator.setup(testMeta.context);
+    testMeta.operator.activate(testMeta.context);
+
+    Assert.assertEquals("largest recovery window", 1,
+        testMeta.operator.getWindowDataManager().getLargestRecoveryWindow());
+
+    testMeta.operator.beginWindow(1);
+    testMeta.operator.endWindow();
+    Assert.assertEquals("num of messages in window 1", 25, testMeta.sink.collectedTuples.size());
+    Assert.assertEquals(message4, testMeta.sink.collectedTuples.get(4));
+    Assert.assertEquals(message17, testMeta.sink.collectedTuples.get(17));
+    testMeta.sink.collectedTuples.clear();
+  }
+
+
+  /**
+   * This test is different from the one in JMSStringInputOperatorTest because of the differences
+   * in acknowledge mode. There is no Ack failure but rather failure in emit Tuple. But endWindow
+   * eventually drains the holdingBuffer and emits all the outstanding tuples so we see 9 tuples
+   * eventually. Because of the async nature of the operator (messages are delivered to the operator
+   * as an async listener) we can't be sure how many messages are present in testMeta.sink.collectedTuples
+   * before endWindow so our assertion is just testMeta.sink.collectedTuples.size() < 9
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testFailureAfterPersistenceAndBeforeRecovery() throws Exception
+  {
+    testMeta.testBase.validateAssumption();
+    testMeta.sink = new CollectorTestSink<Object>()
+    {
+      @Override
+      public void put(Object payload)
+      {
+        if (payload instanceof String && ((String)payload).startsWith("4:")) {
+          throw new RuntimeException("fail 4th message");
+        }
+        synchronized (collectedTuples) {
+          collectedTuples.add(payload);
+          collectedTuples.notifyAll();
+        }
+      }
+    };
+    testMeta.operator.output.setSink(testMeta.sink);
+
+    testMeta.testBase.produceUniqueMsgs("testFailureAfterPersistenceAndBeforeRecovery", 10, false);
+    Thread.sleep(1000);
+    testMeta.operator.beginWindow(1);
+    try {
+      testMeta.operator.emitTuples();
+    } catch (Throwable t) {
+      LOG.debug("emit exception");
+    }
+    Assert.assertTrue("num of messages before endWindow 1", testMeta.sink.collectedTuples.size() < 9);
+    testMeta.operator.endWindow();
+    Assert.assertEquals("num of messages after endWindow 1", 9, testMeta.sink.collectedTuples.size());
+
+    testMeta.operator.setup(testMeta.context);
+    testMeta.operator.activate(testMeta.context);
+
+    Assert.assertEquals("window 1 should exist", 1,
+        testMeta.operator.getWindowDataManager().getLargestRecoveryWindow());
+  }
+
+
+  private static final transient Logger LOG = LoggerFactory.getLogger(SQSStringInputOperatorTest.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/test/java/com/datatorrent/lib/io/jms/SQSTestBase.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/jms/SQSTestBase.java b/library/src/test/java/com/datatorrent/lib/io/jms/SQSTestBase.java
new file mode 100644
index 0000000..4dc63c3
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/io/jms/SQSTestBase.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.io.jms;
+
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+
+import com.amazonaws.auth.PropertiesCredentials;
+import com.amazonaws.auth.PropertiesFileCredentialsProvider;
+import com.amazonaws.services.sqs.AmazonSQSClient;
+import com.amazonaws.services.sqs.model.CreateQueueRequest;
+import com.amazonaws.services.sqs.model.CreateQueueResult;
+import com.amazonaws.services.sqs.model.ListQueuesResult;
+import com.amazonaws.services.sqs.model.PurgeQueueRequest;
+
+/**
+ * Base class for SQS tests. <br/>
+ * Various SQS (AWS) related helper functions
+ */
+public class SQSTestBase
+{
+  public static final String TEST_FOO = "TEST_FOO";
+
+  public PropertiesCredentials testCreds;
+
+  /**
+   * creds to be used by the dev end eg. by the JMSInputOperator in SQS mode
+   */
+  private static String SQSDEV_CREDS_FILENAME = "/sqsdevCreds.properties";
+
+  /**
+   * creds to be used by the test end eg. by SQSInputOperatorTest
+   */
+  private static String SQSTEST_CREDS_FILENAME = "/sqstestCreds.properties";
+
+  private AmazonSQSClient sqs;
+
+  private String currentQueueName;
+  private String currentQueueUrl;
+
+
+  public SQSTestBase()
+  {
+    PropertiesFileCredentialsProvider file = new PropertiesFileCredentialsProvider(getTestCredsFilePath());
+    testCreds = (PropertiesCredentials)file.getCredentials();
+    sqs = new AmazonSQSClient(testCreds);
+  }
+
+  public String getTestCredsFilePath()
+  {
+    return getClass().getResource(SQSTEST_CREDS_FILENAME).getFile();
+  }
+
+  public String getDevCredsFilePath()
+  {
+    return getClass().getResource(SQSDEV_CREDS_FILENAME).getFile();
+  }
+
+  public String getCurrentQueueName()
+  {
+    return currentQueueName;
+  }
+
+  public void setCurrentQueueName(String currentQueueName)
+  {
+    this.currentQueueName = currentQueueName;
+  }
+
+  /**
+   *  Each test creates its own uniquely named queue in SQS and then deletes it afterwards.
+   *  We try to scrub any leftover queues from the previous runs just in case tests were
+   * aborted
+   *
+   * @param currentQueueNamePrefix
+   */
+  public void generateCurrentQueueName(String currentQueueNamePrefix)
+  {
+    if (validateTestCreds()) {
+      ListQueuesResult list = sqs.listQueues(currentQueueNamePrefix);
+      for (String url : list.getQueueUrls()) {
+        sqs.deleteQueue(url);
+      }
+    }
+    this.currentQueueName = currentQueueNamePrefix + System.currentTimeMillis();
+  }
+
+  public void produceMsg(String[] msgs, boolean purgeFirst) throws Exception
+  {
+    CreateQueueResult res = sqs.createQueue(getCurrentQueueName());
+    if (purgeFirst) {
+      PurgeQueueRequest purgeReq = new PurgeQueueRequest(res.getQueueUrl());
+      sqs.purgeQueue(purgeReq);
+    }
+    for (String text : msgs) {
+      sqs.sendMessage(res.getQueueUrl(), text);
+    }
+  }
+
+  /**
+   *
+   * @param text
+   * @throws Exception
+   */
+  public void produceMsg(String text, boolean purgeFirst) throws Exception
+  {
+    produceMsg(new String[] {text}, purgeFirst);
+  }
+
+  /**
+   * TODO: copy the logic of JMSTestBase.produceMsg
+   *
+   * @param text
+   * @throws Exception
+   */
+  public void produceMsg(String text, int num, boolean purgeFirst) throws Exception
+  {
+    String[] array = new String[num];
+    for (int i = 0; i < num; i++) {
+      array[i] = text;
+    }
+    produceMsg(array, purgeFirst);
+  }
+
+  /**
+   * Produce unique messages
+   *
+   * @param text
+   * @throws Exception
+   */
+  public void produceUniqueMsgs(String text, int num, boolean purgeFirst) throws Exception
+  {
+    String[] array = new String[num];
+    for (int i = 0; i < num; i++) {
+      array[i] = "" + i + ":" + text;
+    }
+    produceMsg(array, purgeFirst);
+  }
+
+  public boolean validateTestCreds()
+  {
+    return testCreds.getAWSSecretKey() != null &&
+        testCreds.getAWSSecretKey().trim().isEmpty() == false;
+  }
+
+  public void validateAssumption()
+  {
+    Assume.assumeTrue(validateTestCreds());
+  }
+
+
+  /**
+   * create a queue we can use for testing
+   *
+   * @throws Exception
+   */
+  @Before
+  public void beforTest() throws Exception
+  {
+    validateAssumption();
+    // Create a queue
+    CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(getCurrentQueueName());
+    currentQueueUrl = sqs.createQueue(createQueueRequest).getQueueUrl();
+  }
+
+  @After
+  public void afterTest() throws Exception
+  {
+    sqs.deleteQueue(currentQueueUrl);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/test/resources/sqsdevCreds.properties
----------------------------------------------------------------------
diff --git a/library/src/test/resources/sqsdevCreds.properties b/library/src/test/resources/sqsdevCreds.properties
new file mode 100644
index 0000000..3ce01e4
--- /dev/null
+++ b/library/src/test/resources/sqsdevCreds.properties
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+accessKey=
+secretKey=

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d46a6be6/library/src/test/resources/sqstestCreds.properties
----------------------------------------------------------------------
diff --git a/library/src/test/resources/sqstestCreds.properties b/library/src/test/resources/sqstestCreds.properties
new file mode 100644
index 0000000..3ce01e4
--- /dev/null
+++ b/library/src/test/resources/sqstestCreds.properties
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+accessKey=
+secretKey=


[2/2] apex-malhar git commit: Merge branch 'APEXMALHAR-2156.improvement2' of github.com:sanjaypujare/apex-malhar

Posted by pr...@apache.org.
Merge branch 'APEXMALHAR-2156.improvement2' of github.com:sanjaypujare/apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/37cb5848
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/37cb5848
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/37cb5848

Branch: refs/heads/master
Commit: 37cb584841810dbc2ac099b6d0ba30cdea36f1d1
Parents: 5972bca d46a6be
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Tue Aug 9 16:42:07 2016 -0700
Committer: Pramod Immaneni <pr...@datatorrent.com>
Committed: Tue Aug 9 16:42:07 2016 -0700

----------------------------------------------------------------------
 library/pom.xml                                 |  34 ++-
 .../lib/io/jms/AbstractJMSInputOperator.java    |  10 +
 .../com/datatorrent/lib/io/jms/JMSBase.java     | 214 +++++++++++++--
 .../lib/io/jms/JMSObjectInputOperatorTest.java  |   1 +
 .../lib/io/jms/JMSStringInputOperatorTest.java  |   2 +
 .../lib/io/jms/SQSStringInputOperatorTest.java  | 262 +++++++++++++++++++
 .../com/datatorrent/lib/io/jms/SQSTestBase.java | 187 +++++++++++++
 .../src/test/resources/sqsdevCreds.properties   |  21 ++
 .../src/test/resources/sqstestCreds.properties  |  21 ++
 9 files changed, 719 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/37cb5848/library/pom.xml
----------------------------------------------------------------------
diff --cc library/pom.xml
index 026bbae,8fe30d0..b8c6271
--- a/library/pom.xml
+++ b/library/pom.xml
@@@ -343,11 -349,30 +349,29 @@@
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk-s3</artifactId>
        <version>1.10.73</version>
 -      <scope>test</scope>
      </dependency>
      <dependency>
- 	  <groupId>com.fasterxml.jackson.core</groupId>
- 	  <artifactId>jackson-databind</artifactId>
- 	  <version>2.5.4</version>
+       <groupId>com.amazonaws</groupId>
+       <artifactId>aws-java-sdk-sqs</artifactId>
+       <version>1.10.73</version>
+       <scope>test</scope>
+     </dependency>
+     <dependency>
+       <groupId>com.fasterxml.jackson.core</groupId>
+       <artifactId>jackson-databind</artifactId>
+       <version>2.5.4</version>
+     </dependency>
+     <dependency>	  
+       <groupId>com.amazonaws</groupId>
+       <artifactId>amazon-sqs-java-messaging-lib</artifactId>
+       <version>1.0.0</version>
+       <scope>test</scope>
+       <exclusions>
+         <exclusion>
+           <groupId>com.amazonaws</groupId>
+           <artifactId>aws-java-sdk-sqs</artifactId>
+         </exclusion>
+       </exclusions> 
      </dependency>
    </dependencies>