You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2017/07/13 21:11:56 UTC

[apex-malhar] branch master updated: APEXMALHAR-2434 Use fixed/settable metaQueueName in JMSTransactionableStore. Sanjay Pujare is a contributing author. This closes #612.

This is an automated email from the ASF dual-hosted git repository.

pramod pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git


The following commit(s) were added to refs/heads/master by this push:
     new afb4252  APEXMALHAR-2434 Use fixed/settable metaQueueName in JMSTransactionableStore. Sanjay Pujare <sa...@users.noreply.github.com> is a contributing author. This closes #612.
afb4252 is described below

commit afb4252435745669bc999c8a6b4e7fa2b58cf096
Author: oliverwnk <ol...@gmail.com>
AuthorDate: Wed Jul 12 23:17:37 2017 -0700

    APEXMALHAR-2434 Use fixed/settable metaQueueName in JMSTransactionableStore. Sanjay Pujare <sa...@users.noreply.github.com> is a contributing author. This closes #612.
---
 .../lib/io/jms/JMSTransactionableStore.java        |  75 +++++++----
 .../io/jms/JMSTransactionableStoreTestBase.java    | 137 +++++++++++++++------
 2 files changed, 147 insertions(+), 65 deletions(-)

diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java b/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java
index 11b8447..996bfd4 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java
@@ -31,9 +31,14 @@ import javax.jms.QueueBrowser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.annotation.Stateless;
+
 /**
  * This transactionable store commits the messages sent within a window along with the windowId of the completed window
- * to JMS. This store ensures that the JMS output operator is capable of outputting data to JMS exactly once.
+ * to JMS. WindowIds will be sent to a subject specific meta-data queue with the name of the form '{subject}.metadata'.
+ * It is the responsibility of the user to create the meta-data queue in the JMS provider.
+ * A MessageSelector and an unique 'appOperatorId' message property ensure each operator receives its own windowId.
+ * This store ensures that the JMS output operator is capable of outputting data to JMS exactly once.
  *
  * @since 2.0.0
  */
@@ -44,6 +49,8 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
 
   private transient MessageProducer producer;
   private transient MessageConsumer consumer;
+  private String metaQueueName;
+  private static final String APP_OPERATOR_ID = "appOperatorId";
 
   /**
    * Indicates whether the store is connected or not.
@@ -58,6 +65,26 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
   {
   }
 
+  /**
+   * Get the meta queue name for this store
+   *
+   * @return the metaQueueName
+   */
+  public String getMetaQueueName()
+  {
+    return metaQueueName;
+  }
+
+  /**
+   * Set the meta queue name for this store
+   *
+   * @param metaQueueName the metaQueueName to set
+   */
+  public void setMetaQueueName(String metaQueueName)
+  {
+    this.metaQueueName = metaQueueName;
+  }
+
   @Override
   @SuppressWarnings("rawtypes")
   public long getCommittedWindowId(String appId, int operatorId)
@@ -65,17 +92,15 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
     logger.debug("Getting committed windowId appId {} operatorId {}", appId, operatorId);
 
     try {
-
       beginTransaction();
       BytesMessage message = (BytesMessage)consumer.receive();
-      logger.debug("Retrieved committed window message id {}", message.getJMSMessageID());
+      logger.debug("Retrieved committed window messageId: {}, messageAppOperatorIdProp: {}", message.getJMSMessageID(),
+          message.getStringProperty(APP_OPERATOR_ID));
       long windowId = message.readLong();
 
-      message = getBase().getSession().createBytesMessage();
-      message.writeLong(windowId);
-      producer.send(message);
+      writeWindowId(appId, operatorId, windowId);
       commitTransaction();
-
+      logger.debug("metaQueueName: " + metaQueueName);
       logger.debug("Retrieved windowId {}", windowId);
       return windowId;
     } catch (JMSException ex) {
@@ -94,10 +119,7 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
         appId, operatorId, windowId);
     try {
       removeCommittedWindowId(appId, operatorId);
-      BytesMessage bytesMessage = this.getBase().getSession().createBytesMessage();
-      bytesMessage.writeLong(windowId);
-      producer.send(bytesMessage);
-      logger.debug("Retrieved committed window message id {}", bytesMessage.getJMSMessageID());
+      writeWindowId(appId, operatorId, windowId);
     } catch (JMSException ex) {
       throw new RuntimeException(ex);
     }
@@ -113,6 +135,15 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
     }
   }
 
+  private void writeWindowId(String appId, int operatorId, long windowId) throws JMSException
+  {
+    BytesMessage message = getBase().getSession().createBytesMessage();
+    message.setStringProperty(APP_OPERATOR_ID, appId + "_" + operatorId);
+    message.writeLong(windowId);
+    producer.send(message);
+    logger.debug("Message with windowId {} sent", windowId);
+  }
+
   @Override
   public void beginTransaction()
   {
@@ -166,16 +197,17 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
     logger.debug("Entering connect. is in transaction: {}", inTransaction);
 
     try {
-      String queueName = getQueueName(getAppId(), getOperatorId());
-
       logger.debug("Base is null: {}", getBase() == null);
 
       if (getBase() != null) {
         logger.debug("Session is null: {}", getBase().getSession() == null);
       }
-
-      Queue queue = getBase().getSession().createQueue(queueName);
-      QueueBrowser browser = getBase().getSession().createBrowser(queue);
+      if (metaQueueName == null) {
+        metaQueueName = getBase().getSubject() + ".metadata";
+      }
+      String appOperatorId = getAppId() + "_" + getOperatorId();
+      Queue queue = getBase().getSession().createQueue(metaQueueName);
+      QueueBrowser browser = getBase().getSession().createBrowser(queue, APP_OPERATOR_ID + " = '" + appOperatorId + "'");
       boolean hasStore;
 
       try {
@@ -186,16 +218,14 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
       }
 
       producer = getBase().getSession().createProducer(queue);
-      consumer = getBase().getSession().createConsumer(queue);
+      consumer = getBase().getSession().createConsumer(queue, APP_OPERATOR_ID + " = '" + appOperatorId + "'");
 
       connected = true;
       logger.debug("Connected. is in transaction: {}", inTransaction);
 
       if (!hasStore) {
         beginTransaction();
-        BytesMessage message = getBase().getSession().createBytesMessage();
-        message.writeLong(-1L);
-        producer.send(message);
+        writeWindowId(getAppId(), getOperatorId(), Stateless.WINDOW_ID);
         commitTransaction();
       }
     } catch (JMSException ex) {
@@ -226,9 +256,4 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
   {
     return connected;
   }
-
-  private String getQueueName(String appId, int operatorId)
-  {
-    return appId + "-" + operatorId;
-  }
 }
diff --git a/library/src/test/java/com/datatorrent/lib/io/jms/JMSTransactionableStoreTestBase.java b/library/src/test/java/com/datatorrent/lib/io/jms/JMSTransactionableStoreTestBase.java
index 32fc442..1b8dae0 100644
--- a/library/src/test/java/com/datatorrent/lib/io/jms/JMSTransactionableStoreTestBase.java
+++ b/library/src/test/java/com/datatorrent/lib/io/jms/JMSTransactionableStoreTestBase.java
@@ -46,11 +46,11 @@ public class JMSTransactionableStoreTestBase extends JMSTestBase
   public static final String CLIENT_ID = "Client1";
   public static final String APP_ID = "appId";
   public static final int OPERATOR_ID = 1;
-  public static JMSBaseTransactionableStore store;
-  public static JMSStringSinglePortOutputOperator outputOperator;
+  public static final int OPERATOR_2_ID = 2;
   public static Class<? extends JMSBaseTransactionableStore> storeClass;
 
   public static OperatorContext testOperatorContext;
+  public static OperatorContext testOperator2Context;
 
   public static class TestMeta extends TestWatcher
   {
@@ -61,6 +61,7 @@ public class JMSTransactionableStoreTestBase extends JMSTestBase
       DefaultAttributeMap attributes = new DefaultAttributeMap();
       attributes.put(DAG.APPLICATION_ID, APP_ID);
       testOperatorContext = mockOperatorContext(OPERATOR_ID, attributes);
+      testOperator2Context = mockOperatorContext(OPERATOR_2_ID, attributes);
       FileUtils.deleteQuietly(new File(FSPsuedoTransactionableStore.DEFAULT_RECOVERY_DIRECTORY));
     }
 
@@ -83,17 +84,22 @@ public class JMSTransactionableStoreTestBase extends JMSTestBase
    * This is a helper method to create the output operator. Note this cannot be
    * put in test watcher because because JMS connection issues occur when this code
    * is run from a test watcher.
+   *
+   * @param metaQueueName metaQueueName to set in JMSTransactionableStore
    */
-  private void createOperator()
+  private JMSStringSinglePortOutputOperator createOperator(OperatorContext context, String metaQueueName)
   {
-    outputOperator = new JMSStringSinglePortOutputOperator();
-
+    JMSStringSinglePortOutputOperator outputOperator = new JMSStringSinglePortOutputOperator();
+    JMSBaseTransactionableStore store;
     try {
       store = storeClass.newInstance();
     } catch (InstantiationException | IllegalAccessException ex) {
       throw new RuntimeException(ex);
     }
 
+    if (JMSTransactionableStore.class.equals(storeClass) && metaQueueName != null) {
+      ((JMSTransactionableStore)store).setMetaQueueName(metaQueueName);
+    }
     outputOperator.getConnectionFactoryProperties().put("userName", "");
     outputOperator.getConnectionFactoryProperties().put("password", "");
     outputOperator.getConnectionFactoryProperties().put("brokerURL", "tcp://localhost:61617");
@@ -106,32 +112,27 @@ public class JMSTransactionableStoreTestBase extends JMSTestBase
     outputOperator.setDurable(false);
     outputOperator.setStore(store);
     outputOperator.setVerbose(true);
-    outputOperator.setup(testOperatorContext);
-  }
+    outputOperator.setup(context);
 
-  /**
-   * This is a helper method to teardown a test operator.
-   */
-  private void deleteOperator()
-  {
-    outputOperator.teardown();
+    return outputOperator;
   }
 
-  //@Ignore
   @Test
   public void connectedTest()
   {
-    createOperator();
+    JMSStringSinglePortOutputOperator jmsOutputOperator = createOperator(testOperatorContext, null);
+    JMSBaseTransactionableStore store = jmsOutputOperator.getStore();
+
     Assert.assertTrue("Should be connected.", store.isConnected());
-    deleteOperator();
+    jmsOutputOperator.teardown();
     Assert.assertFalse("Should not be connected.", store.isConnected());
   }
 
-  //@Ignore
   @Test
   public void transactionTest()
   {
-    createOperator();
+    JMSStringSinglePortOutputOperator jmsOutputOperator = createOperator(testOperatorContext, null);
+    JMSBaseTransactionableStore store = jmsOutputOperator.getStore();
 
     Assert.assertFalse("Should not be in transaction.", store.isInTransaction());
     store.beginTransaction();
@@ -139,61 +140,116 @@ public class JMSTransactionableStoreTestBase extends JMSTestBase
     store.commitTransaction();
     Assert.assertFalse("Should not be in transaction.", store.isInTransaction());
 
-    deleteOperator();
+    jmsOutputOperator.teardown();
   }
 
-  //@Ignore
   @Test
   public void storeRetreiveTransactionTest()
   {
-    createOperator();
+    JMSStringSinglePortOutputOperator jmsOutputOperator = createOperator(testOperatorContext, null);
+    JMSBaseTransactionableStore store = jmsOutputOperator.getStore();
 
-    outputOperator.beginWindow(0L);
-    outputOperator.endWindow();
+    jmsOutputOperator.beginWindow(0L);
+    jmsOutputOperator.endWindow();
 
     long windowId = store.getCommittedWindowId(APP_ID, OPERATOR_ID);
     Assert.assertEquals(0L, windowId);
 
-    deleteOperator();
+    jmsOutputOperator.teardown();
+  }
+
+  /**
+   * Creates two operators with different operatorId and same appId to test correct functionality of storing and
+   * retrieving windowId with message selector
+   */
+  @Test
+  public void twoOperatorsStoreRetrieveWithMessageSelectorTransactionTest()
+  {
+    JMSStringSinglePortOutputOperator jmsOutputOperator = createOperator(testOperatorContext, null);
+
+    jmsOutputOperator.beginWindow(0L);
+    jmsOutputOperator.endWindow();
+
+    //Create fresh operator context
+
+    JMSStringSinglePortOutputOperator jmsOutputOperator2 = createOperator(testOperator2Context, null);
+    jmsOutputOperator2.beginWindow(1L);
+    jmsOutputOperator2.endWindow();
+
+    long windowIdOp = jmsOutputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID);
+    Assert.assertEquals(0L, windowIdOp);
+
+    long windowIdOp2 = jmsOutputOperator2.getStore().getCommittedWindowId(APP_ID, OPERATOR_2_ID);
+    Assert.assertEquals(1L, windowIdOp2);
+
+    jmsOutputOperator.teardown();
+    jmsOutputOperator2.teardown();
+  }
+
+  /**
+   * Similar to the test above with using a custom metaQueueName
+   */
+  @Test
+  public void twoOperatorsStoreRetrieveWithMessageSelectorTransactionTestWithCustomMetaQueueName()
+  {
+    JMSStringSinglePortOutputOperator jmsOutputOperator = createOperator(testOperatorContext, "metaQ1");
+
+    jmsOutputOperator.beginWindow(0L);
+    jmsOutputOperator.endWindow();
+
+    //Create fresh operator context
+
+    JMSStringSinglePortOutputOperator jmsOutputOperator2 = createOperator(testOperator2Context, "metaQ2");
+    jmsOutputOperator2.beginWindow(1L);
+    jmsOutputOperator2.endWindow();
+
+    long windowIdOp = jmsOutputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID);
+    Assert.assertEquals(0L, windowIdOp);
+
+    long windowIdOp2 = jmsOutputOperator2.getStore().getCommittedWindowId(APP_ID, OPERATOR_2_ID);
+    Assert.assertEquals(1L, windowIdOp2);
+
+    jmsOutputOperator.teardown();
+    jmsOutputOperator2.teardown();
   }
 
-  ////@Ignore
   @Test
   public void multiWindowTransactionTest()
   {
-    createOperator();
+    JMSStringSinglePortOutputOperator jmsOutputOperator = createOperator(testOperatorContext, null);
+    JMSBaseTransactionableStore store = jmsOutputOperator.getStore();
 
     long windowId = store.getCommittedWindowId(APP_ID, OPERATOR_ID);
     Assert.assertEquals(-1L, windowId);
 
-    outputOperator.beginWindow(0L);
-    outputOperator.endWindow();
+    jmsOutputOperator.beginWindow(0L);
+    jmsOutputOperator.endWindow();
 
     windowId = store.getCommittedWindowId(APP_ID, OPERATOR_ID);
     Assert.assertEquals(0L, windowId);
 
-    outputOperator.beginWindow(1L);
-    outputOperator.endWindow();
+    jmsOutputOperator.beginWindow(1L);
+    jmsOutputOperator.endWindow();
 
     windowId = store.getCommittedWindowId(APP_ID, OPERATOR_ID);
     Assert.assertEquals(1L, windowId);
 
-    outputOperator.beginWindow(2L);
-    outputOperator.endWindow();
+    jmsOutputOperator.beginWindow(2L);
+    jmsOutputOperator.endWindow();
 
     windowId = store.getCommittedWindowId(APP_ID, OPERATOR_ID);
     Assert.assertEquals(2L, windowId);
 
-    outputOperator.beginWindow(3L);
-    outputOperator.endWindow();
+    jmsOutputOperator.beginWindow(3L);
+    jmsOutputOperator.endWindow();
 
     windowId = store.getCommittedWindowId(APP_ID, OPERATOR_ID);
     Assert.assertEquals(3L, windowId);
 
-    outputOperator.beginWindow(4L);
-    outputOperator.endWindow();
+    jmsOutputOperator.beginWindow(4L);
+    jmsOutputOperator.endWindow();
 
-    deleteOperator();
+    jmsOutputOperator.teardown();
   }
 
   @Test
@@ -205,10 +261,11 @@ public class JMSTransactionableStoreTestBase extends JMSTestBase
     listener.setupConnection();
     listener.run();
 
-    createOperator();
+    JMSStringSinglePortOutputOperator jmsOutputOperator = createOperator(testOperatorContext, null);
+    JMSBaseTransactionableStore store = jmsOutputOperator.getStore();
 
     store.beginTransaction();
-    outputOperator.inputPort.put("a");
+    jmsOutputOperator.inputPort.put("a");
 
     Thread.sleep(500);
     Assert.assertEquals(0, listener.receivedData.size());
@@ -217,7 +274,7 @@ public class JMSTransactionableStoreTestBase extends JMSTestBase
     Thread.sleep(500);
     Assert.assertEquals(1, listener.receivedData.size());
 
-    deleteOperator();
+    jmsOutputOperator.teardown();
 
     listener.closeConnection();
   }

-- 
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].