You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by pr...@apache.org on 2017/07/13 21:11:56 UTC
[apex-malhar] branch master updated: APEXMALHAR-2434 Use
fixed/settable metaQueueName in JMSTransactionableStore. Sanjay Pujare
is a contributing author. This
closes #612.
This is an automated email from the ASF dual-hosted git repository.
pramod pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-malhar.git
The following commit(s) were added to refs/heads/master by this push:
new afb4252 APEXMALHAR-2434 Use fixed/settable metaQueueName in JMSTransactionableStore. Sanjay Pujare <sa...@users.noreply.github.com> is a contributing author. This closes #612.
afb4252 is described below
commit afb4252435745669bc999c8a6b4e7fa2b58cf096
Author: oliverwnk <ol...@gmail.com>
AuthorDate: Wed Jul 12 23:17:37 2017 -0700
APEXMALHAR-2434 Use fixed/settable metaQueueName in JMSTransactionableStore. Sanjay Pujare <sa...@users.noreply.github.com> is a contributing author. This closes #612.
---
.../lib/io/jms/JMSTransactionableStore.java | 75 +++++++----
.../io/jms/JMSTransactionableStoreTestBase.java | 137 +++++++++++++++------
2 files changed, 147 insertions(+), 65 deletions(-)
diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java b/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java
index 11b8447..996bfd4 100644
--- a/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java
+++ b/library/src/main/java/com/datatorrent/lib/io/jms/JMSTransactionableStore.java
@@ -31,9 +31,14 @@ import javax.jms.QueueBrowser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.api.annotation.Stateless;
+
/**
* This transactionable store commits the messages sent within a window along with the windowId of the completed window
- * to JMS. This store ensures that the JMS output operator is capable of outputting data to JMS exactly once.
+ * to JMS. WindowIds will be sent to a subject specific meta-data queue with the name of the form '{subject}.metadata'.
+ * It is the responsibility of the user to create the meta-data queue in the JMS provider.
+ * A MessageSelector and an unique 'appOperatorId' message property ensure each operator receives its own windowId.
+ * This store ensures that the JMS output operator is capable of outputting data to JMS exactly once.
*
* @since 2.0.0
*/
@@ -44,6 +49,8 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
private transient MessageProducer producer;
private transient MessageConsumer consumer;
+ private String metaQueueName;
+ private static final String APP_OPERATOR_ID = "appOperatorId";
/**
* Indicates whether the store is connected or not.
@@ -58,6 +65,26 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
{
}
+ /**
+ * Get the meta queue name for this store
+ *
+ * @return the metaQueueName
+ */
+ public String getMetaQueueName()
+ {
+ return metaQueueName;
+ }
+
+ /**
+ * Set the meta queue name for this store
+ *
+ * @param metaQueueName the metaQueueName to set
+ */
+ public void setMetaQueueName(String metaQueueName)
+ {
+ this.metaQueueName = metaQueueName;
+ }
+
@Override
@SuppressWarnings("rawtypes")
public long getCommittedWindowId(String appId, int operatorId)
@@ -65,17 +92,15 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
logger.debug("Getting committed windowId appId {} operatorId {}", appId, operatorId);
try {
-
beginTransaction();
BytesMessage message = (BytesMessage)consumer.receive();
- logger.debug("Retrieved committed window message id {}", message.getJMSMessageID());
+ logger.debug("Retrieved committed window messageId: {}, messageAppOperatorIdProp: {}", message.getJMSMessageID(),
+ message.getStringProperty(APP_OPERATOR_ID));
long windowId = message.readLong();
- message = getBase().getSession().createBytesMessage();
- message.writeLong(windowId);
- producer.send(message);
+ writeWindowId(appId, operatorId, windowId);
commitTransaction();
-
+ logger.debug("metaQueueName: " + metaQueueName);
logger.debug("Retrieved windowId {}", windowId);
return windowId;
} catch (JMSException ex) {
@@ -94,10 +119,7 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
appId, operatorId, windowId);
try {
removeCommittedWindowId(appId, operatorId);
- BytesMessage bytesMessage = this.getBase().getSession().createBytesMessage();
- bytesMessage.writeLong(windowId);
- producer.send(bytesMessage);
- logger.debug("Retrieved committed window message id {}", bytesMessage.getJMSMessageID());
+ writeWindowId(appId, operatorId, windowId);
} catch (JMSException ex) {
throw new RuntimeException(ex);
}
@@ -113,6 +135,15 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
}
}
+ private void writeWindowId(String appId, int operatorId, long windowId) throws JMSException
+ {
+ BytesMessage message = getBase().getSession().createBytesMessage();
+ message.setStringProperty(APP_OPERATOR_ID, appId + "_" + operatorId);
+ message.writeLong(windowId);
+ producer.send(message);
+ logger.debug("Message with windowId {} sent", windowId);
+ }
+
@Override
public void beginTransaction()
{
@@ -166,16 +197,17 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
logger.debug("Entering connect. is in transaction: {}", inTransaction);
try {
- String queueName = getQueueName(getAppId(), getOperatorId());
-
logger.debug("Base is null: {}", getBase() == null);
if (getBase() != null) {
logger.debug("Session is null: {}", getBase().getSession() == null);
}
-
- Queue queue = getBase().getSession().createQueue(queueName);
- QueueBrowser browser = getBase().getSession().createBrowser(queue);
+ if (metaQueueName == null) {
+ metaQueueName = getBase().getSubject() + ".metadata";
+ }
+ String appOperatorId = getAppId() + "_" + getOperatorId();
+ Queue queue = getBase().getSession().createQueue(metaQueueName);
+ QueueBrowser browser = getBase().getSession().createBrowser(queue, APP_OPERATOR_ID + " = '" + appOperatorId + "'");
boolean hasStore;
try {
@@ -186,16 +218,14 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
}
producer = getBase().getSession().createProducer(queue);
- consumer = getBase().getSession().createConsumer(queue);
+ consumer = getBase().getSession().createConsumer(queue, APP_OPERATOR_ID + " = '" + appOperatorId + "'");
connected = true;
logger.debug("Connected. is in transaction: {}", inTransaction);
if (!hasStore) {
beginTransaction();
- BytesMessage message = getBase().getSession().createBytesMessage();
- message.writeLong(-1L);
- producer.send(message);
+ writeWindowId(getAppId(), getOperatorId(), Stateless.WINDOW_ID);
commitTransaction();
}
} catch (JMSException ex) {
@@ -226,9 +256,4 @@ public class JMSTransactionableStore extends JMSBaseTransactionableStore
{
return connected;
}
-
- private String getQueueName(String appId, int operatorId)
- {
- return appId + "-" + operatorId;
- }
}
diff --git a/library/src/test/java/com/datatorrent/lib/io/jms/JMSTransactionableStoreTestBase.java b/library/src/test/java/com/datatorrent/lib/io/jms/JMSTransactionableStoreTestBase.java
index 32fc442..1b8dae0 100644
--- a/library/src/test/java/com/datatorrent/lib/io/jms/JMSTransactionableStoreTestBase.java
+++ b/library/src/test/java/com/datatorrent/lib/io/jms/JMSTransactionableStoreTestBase.java
@@ -46,11 +46,11 @@ public class JMSTransactionableStoreTestBase extends JMSTestBase
public static final String CLIENT_ID = "Client1";
public static final String APP_ID = "appId";
public static final int OPERATOR_ID = 1;
- public static JMSBaseTransactionableStore store;
- public static JMSStringSinglePortOutputOperator outputOperator;
+ public static final int OPERATOR_2_ID = 2;
public static Class<? extends JMSBaseTransactionableStore> storeClass;
public static OperatorContext testOperatorContext;
+ public static OperatorContext testOperator2Context;
public static class TestMeta extends TestWatcher
{
@@ -61,6 +61,7 @@ public class JMSTransactionableStoreTestBase extends JMSTestBase
DefaultAttributeMap attributes = new DefaultAttributeMap();
attributes.put(DAG.APPLICATION_ID, APP_ID);
testOperatorContext = mockOperatorContext(OPERATOR_ID, attributes);
+ testOperator2Context = mockOperatorContext(OPERATOR_2_ID, attributes);
FileUtils.deleteQuietly(new File(FSPsuedoTransactionableStore.DEFAULT_RECOVERY_DIRECTORY));
}
@@ -83,17 +84,22 @@ public class JMSTransactionableStoreTestBase extends JMSTestBase
* This is a helper method to create the output operator. Note this cannot be
* put in test watcher because because JMS connection issues occur when this code
* is run from a test watcher.
+ *
+ * @param metaQueueName metaQueueName to set in JMSTransactionableStore
*/
- private void createOperator()
+ private JMSStringSinglePortOutputOperator createOperator(OperatorContext context, String metaQueueName)
{
- outputOperator = new JMSStringSinglePortOutputOperator();
-
+ JMSStringSinglePortOutputOperator outputOperator = new JMSStringSinglePortOutputOperator();
+ JMSBaseTransactionableStore store;
try {
store = storeClass.newInstance();
} catch (InstantiationException | IllegalAccessException ex) {
throw new RuntimeException(ex);
}
+ if (JMSTransactionableStore.class.equals(storeClass) && metaQueueName != null) {
+ ((JMSTransactionableStore)store).setMetaQueueName(metaQueueName);
+ }
outputOperator.getConnectionFactoryProperties().put("userName", "");
outputOperator.getConnectionFactoryProperties().put("password", "");
outputOperator.getConnectionFactoryProperties().put("brokerURL", "tcp://localhost:61617");
@@ -106,32 +112,27 @@ public class JMSTransactionableStoreTestBase extends JMSTestBase
outputOperator.setDurable(false);
outputOperator.setStore(store);
outputOperator.setVerbose(true);
- outputOperator.setup(testOperatorContext);
- }
+ outputOperator.setup(context);
- /**
- * This is a helper method to teardown a test operator.
- */
- private void deleteOperator()
- {
- outputOperator.teardown();
+ return outputOperator;
}
- //@Ignore
@Test
public void connectedTest()
{
- createOperator();
+ JMSStringSinglePortOutputOperator jmsOutputOperator = createOperator(testOperatorContext, null);
+ JMSBaseTransactionableStore store = jmsOutputOperator.getStore();
+
Assert.assertTrue("Should be connected.", store.isConnected());
- deleteOperator();
+ jmsOutputOperator.teardown();
Assert.assertFalse("Should not be connected.", store.isConnected());
}
- //@Ignore
@Test
public void transactionTest()
{
- createOperator();
+ JMSStringSinglePortOutputOperator jmsOutputOperator = createOperator(testOperatorContext, null);
+ JMSBaseTransactionableStore store = jmsOutputOperator.getStore();
Assert.assertFalse("Should not be in transaction.", store.isInTransaction());
store.beginTransaction();
@@ -139,61 +140,116 @@ public class JMSTransactionableStoreTestBase extends JMSTestBase
store.commitTransaction();
Assert.assertFalse("Should not be in transaction.", store.isInTransaction());
- deleteOperator();
+ jmsOutputOperator.teardown();
}
- //@Ignore
@Test
public void storeRetreiveTransactionTest()
{
- createOperator();
+ JMSStringSinglePortOutputOperator jmsOutputOperator = createOperator(testOperatorContext, null);
+ JMSBaseTransactionableStore store = jmsOutputOperator.getStore();
- outputOperator.beginWindow(0L);
- outputOperator.endWindow();
+ jmsOutputOperator.beginWindow(0L);
+ jmsOutputOperator.endWindow();
long windowId = store.getCommittedWindowId(APP_ID, OPERATOR_ID);
Assert.assertEquals(0L, windowId);
- deleteOperator();
+ jmsOutputOperator.teardown();
+ }
+
+ /**
+ * Creates two operators with different operatorId and same appId to test correct functionality of storing and
+ * retrieving windowId with message selector
+ */
+ @Test
+ public void twoOperatorsStoreRetrieveWithMessageSelectorTransactionTest()
+ {
+ JMSStringSinglePortOutputOperator jmsOutputOperator = createOperator(testOperatorContext, null);
+
+ jmsOutputOperator.beginWindow(0L);
+ jmsOutputOperator.endWindow();
+
+ //Create fresh operator context
+
+ JMSStringSinglePortOutputOperator jmsOutputOperator2 = createOperator(testOperator2Context, null);
+ jmsOutputOperator2.beginWindow(1L);
+ jmsOutputOperator2.endWindow();
+
+ long windowIdOp = jmsOutputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID);
+ Assert.assertEquals(0L, windowIdOp);
+
+ long windowIdOp2 = jmsOutputOperator2.getStore().getCommittedWindowId(APP_ID, OPERATOR_2_ID);
+ Assert.assertEquals(1L, windowIdOp2);
+
+ jmsOutputOperator.teardown();
+ jmsOutputOperator2.teardown();
+ }
+
+ /**
+ * Similar to the test above with using a custom metaQueueName
+ */
+ @Test
+ public void twoOperatorsStoreRetrieveWithMessageSelectorTransactionTestWithCustomMetaQueueName()
+ {
+ JMSStringSinglePortOutputOperator jmsOutputOperator = createOperator(testOperatorContext, "metaQ1");
+
+ jmsOutputOperator.beginWindow(0L);
+ jmsOutputOperator.endWindow();
+
+ //Create fresh operator context
+
+ JMSStringSinglePortOutputOperator jmsOutputOperator2 = createOperator(testOperator2Context, "metaQ2");
+ jmsOutputOperator2.beginWindow(1L);
+ jmsOutputOperator2.endWindow();
+
+ long windowIdOp = jmsOutputOperator.getStore().getCommittedWindowId(APP_ID, OPERATOR_ID);
+ Assert.assertEquals(0L, windowIdOp);
+
+ long windowIdOp2 = jmsOutputOperator2.getStore().getCommittedWindowId(APP_ID, OPERATOR_2_ID);
+ Assert.assertEquals(1L, windowIdOp2);
+
+ jmsOutputOperator.teardown();
+ jmsOutputOperator2.teardown();
}
- ////@Ignore
@Test
public void multiWindowTransactionTest()
{
- createOperator();
+ JMSStringSinglePortOutputOperator jmsOutputOperator = createOperator(testOperatorContext, null);
+ JMSBaseTransactionableStore store = jmsOutputOperator.getStore();
long windowId = store.getCommittedWindowId(APP_ID, OPERATOR_ID);
Assert.assertEquals(-1L, windowId);
- outputOperator.beginWindow(0L);
- outputOperator.endWindow();
+ jmsOutputOperator.beginWindow(0L);
+ jmsOutputOperator.endWindow();
windowId = store.getCommittedWindowId(APP_ID, OPERATOR_ID);
Assert.assertEquals(0L, windowId);
- outputOperator.beginWindow(1L);
- outputOperator.endWindow();
+ jmsOutputOperator.beginWindow(1L);
+ jmsOutputOperator.endWindow();
windowId = store.getCommittedWindowId(APP_ID, OPERATOR_ID);
Assert.assertEquals(1L, windowId);
- outputOperator.beginWindow(2L);
- outputOperator.endWindow();
+ jmsOutputOperator.beginWindow(2L);
+ jmsOutputOperator.endWindow();
windowId = store.getCommittedWindowId(APP_ID, OPERATOR_ID);
Assert.assertEquals(2L, windowId);
- outputOperator.beginWindow(3L);
- outputOperator.endWindow();
+ jmsOutputOperator.beginWindow(3L);
+ jmsOutputOperator.endWindow();
windowId = store.getCommittedWindowId(APP_ID, OPERATOR_ID);
Assert.assertEquals(3L, windowId);
- outputOperator.beginWindow(4L);
- outputOperator.endWindow();
+ jmsOutputOperator.beginWindow(4L);
+ jmsOutputOperator.endWindow();
- deleteOperator();
+ jmsOutputOperator.teardown();
}
@Test
@@ -205,10 +261,11 @@ public class JMSTransactionableStoreTestBase extends JMSTestBase
listener.setupConnection();
listener.run();
- createOperator();
+ JMSStringSinglePortOutputOperator jmsOutputOperator = createOperator(testOperatorContext, null);
+ JMSBaseTransactionableStore store = jmsOutputOperator.getStore();
store.beginTransaction();
- outputOperator.inputPort.put("a");
+ jmsOutputOperator.inputPort.put("a");
Thread.sleep(500);
Assert.assertEquals(0, listener.receivedData.size());
@@ -217,7 +274,7 @@ public class JMSTransactionableStoreTestBase extends JMSTestBase
Thread.sleep(500);
Assert.assertEquals(1, listener.receivedData.size());
- deleteOperator();
+ jmsOutputOperator.teardown();
listener.closeConnection();
}
--
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].