You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by bh...@apache.org on 2016/10/26 13:20:49 UTC
apex-malhar git commit: APEXMALHAR-2291 Fix for Exactly-once
processing of JdbcPOJOInsertOutput Operator. Added a check in endWindow() to
not to commit if committed window id is greater than current window id.
Repository: apex-malhar
Updated Branches:
refs/heads/master 2b775968a -> 5ae58d039
APEXMALHAR-2291 Fix for Exactly-once processing of JdbcPOJOInsertOutput Operator.
Added a check in endWindow() to not to commit if committed window id is greater than current window id.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/5ae58d03
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/5ae58d03
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/5ae58d03
Branch: refs/heads/master
Commit: 5ae58d039dfba2b9debcc8e42f4f328e34d5bdc2
Parents: 2b77596
Author: Hitesh-Scorpio <fo...@gmail.com>
Authored: Fri Oct 14 20:45:38 2016 +0530
Committer: Hitesh-Scorpio <fo...@gmail.com>
Committed: Wed Oct 26 17:11:58 2016 +0530
----------------------------------------------------------------------
...sThruTransactionableStoreOutputOperator.java | 8 +-
.../lib/db/jdbc/JdbcPojoOperatorTest.java | 92 ++++++++++++++++++++
2 files changed, 97 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5ae58d03/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java
index b471a63..d21bd01 100644
--- a/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java
@@ -49,9 +49,11 @@ public abstract class AbstractPassThruTransactionableStoreOutputOperator<T, S ex
@Override
public void endWindow()
{
- store.storeCommittedWindowId(appId, operatorId, currentWindowId);
- store.commitTransaction();
- committedWindowId = currentWindowId;
+ if ( committedWindowId < currentWindowId ) {
+ store.storeCommittedWindowId(appId, operatorId, currentWindowId);
+ store.commitTransaction();
+ committedWindowId = currentWindowId;
+ }
super.endWindow();
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5ae58d03/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java
index e6d8b42..91cb2f2 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java
@@ -335,6 +335,98 @@ public class JdbcPojoOperatorTest extends JdbcOperatorTest
}
/**
+ * This test will assume direct mapping for POJO fields to DB columns All
+ * fields in DB present in POJO and will test it for exactly once criteria
+ */
+ @Test
+ public void testJdbcPojoInsertOutputOperatorExactlyOnce()
+ {
+ JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
+ transactionalStore.setDatabaseDriver(DB_DRIVER);
+ transactionalStore.setDatabaseUrl(URL);
+
+ com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+ attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+ OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(
+ OPERATOR_ID, attributeMap);
+
+ TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
+ outputOperator.setBatchSize(3);
+ outputOperator.setTablename(TABLE_POJO_NAME);
+
+ outputOperator.setStore(transactionalStore);
+
+ outputOperator.setup(context);
+
+ Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+ TestPortContext tpc = new TestPortContext(portAttributes);
+ outputOperator.input.setup(tpc);
+
+ CollectorTestSink<Object> errorSink = new CollectorTestSink<>();
+ TestUtils.setSink(outputOperator.error, errorSink);
+
+ outputOperator.activate(context);
+
+ List<TestPOJOEvent> events = Lists.newArrayList();
+ for (int i = 0; i < 70; i++) {
+ events.add(new TestPOJOEvent(i, "test" + i));
+ }
+
+ outputOperator.beginWindow(0);
+ for (int i = 0; i < 10; i++) {
+ outputOperator.input.process(events.get(i));
+ }
+ outputOperator.endWindow();
+
+ outputOperator.beginWindow(1);
+ for (int i = 10; i < 20; i++) {
+ outputOperator.input.process(events.get(i));
+ }
+ outputOperator.endWindow();
+
+ outputOperator.beginWindow(2);
+ for (int i = 20; i < 30; i++) {
+ outputOperator.input.process(events.get(i));
+ }
+ outputOperator.endWindow();
+
+ outputOperator.setup(context);
+ outputOperator.input.setup(tpc);
+ outputOperator.activate(context);
+
+ outputOperator.beginWindow(0);
+ for (int i = 30; i < 40; i++) {
+ outputOperator.input.process(events.get(i));
+ }
+ outputOperator.endWindow();
+
+ outputOperator.beginWindow(1);
+ for (int i = 40; i < 50; i++) {
+ outputOperator.input.process(events.get(i));
+ }
+ outputOperator.endWindow();
+
+ outputOperator.beginWindow(2);
+ for (int i = 50; i < 60; i++) {
+ outputOperator.input.process(events.get(i));
+ }
+
+ outputOperator.beginWindow(3);
+ for (int i = 60; i < 70; i++) {
+ outputOperator.input.process(events.get(i));
+ }
+ outputOperator.endWindow();
+
+ outputOperator.deactivate();
+ outputOperator.teardown();
+
+ Assert.assertEquals("rows in db", 40, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME));
+
+ }
+
+
+ /**
* This test will assume direct mapping for POJO fields to DB columns Nullable
* DB field missing in POJO name1 field, which is nullable in DB is missing
* from POJO POJO(id, name) -> DB(id, name1)