You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by bh...@apache.org on 2016/10/26 13:20:49 UTC

apex-malhar git commit: APEXMALHAR-2291 Fix for Exactly-once processing of JdbcPOJOInsertOutput Operator. Added a check in endWindow() to not to commit if committed window id is greater than current window id.

Repository: apex-malhar
Updated Branches:
  refs/heads/master 2b775968a -> 5ae58d039


APEXMALHAR-2291 Fix for Exactly-once processing of JdbcPOJOInsertOutput Operator.
Added a check in endWindow() to not to commit if committed window id is greater than current window id.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/5ae58d03
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/5ae58d03
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/5ae58d03

Branch: refs/heads/master
Commit: 5ae58d039dfba2b9debcc8e42f4f328e34d5bdc2
Parents: 2b77596
Author: Hitesh-Scorpio <fo...@gmail.com>
Authored: Fri Oct 14 20:45:38 2016 +0530
Committer: Hitesh-Scorpio <fo...@gmail.com>
Committed: Wed Oct 26 17:11:58 2016 +0530

----------------------------------------------------------------------
 ...sThruTransactionableStoreOutputOperator.java |  8 +-
 .../lib/db/jdbc/JdbcPojoOperatorTest.java       | 92 ++++++++++++++++++++
 2 files changed, 97 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5ae58d03/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java
index b471a63..d21bd01 100644
--- a/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/AbstractPassThruTransactionableStoreOutputOperator.java
@@ -49,9 +49,11 @@ public abstract class AbstractPassThruTransactionableStoreOutputOperator<T, S ex
   @Override
   public void endWindow()
   {
-    store.storeCommittedWindowId(appId, operatorId, currentWindowId);
-    store.commitTransaction();
-    committedWindowId = currentWindowId;
+    if ( committedWindowId < currentWindowId ) {
+      store.storeCommittedWindowId(appId, operatorId, currentWindowId);
+      store.commitTransaction();
+      committedWindowId = currentWindowId;
+    }
     super.endWindow();
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5ae58d03/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java
index e6d8b42..91cb2f2 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoOperatorTest.java
@@ -335,6 +335,98 @@ public class JdbcPojoOperatorTest extends JdbcOperatorTest
   }
 
   /**
+   * This test will assume direct mapping for POJO fields to DB columns All
+   * fields in DB present in POJO and will test it for exactly once criteria
+   */
+  @Test
+  public void testJdbcPojoInsertOutputOperatorExactlyOnce()
+  {
+    JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
+    transactionalStore.setDatabaseDriver(DB_DRIVER);
+    transactionalStore.setDatabaseUrl(URL);
+
+    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(
+        OPERATOR_ID, attributeMap);
+
+    TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
+    outputOperator.setBatchSize(3);
+    outputOperator.setTablename(TABLE_POJO_NAME);
+
+    outputOperator.setStore(transactionalStore);
+
+    outputOperator.setup(context);
+
+    Attribute.AttributeMap.DefaultAttributeMap portAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    portAttributes.put(Context.PortContext.TUPLE_CLASS, TestPOJOEvent.class);
+    TestPortContext tpc = new TestPortContext(portAttributes);
+    outputOperator.input.setup(tpc);
+
+    CollectorTestSink<Object> errorSink = new CollectorTestSink<>();
+    TestUtils.setSink(outputOperator.error, errorSink);
+
+    outputOperator.activate(context);
+
+    List<TestPOJOEvent> events = Lists.newArrayList();
+    for (int i = 0; i < 70; i++) {
+      events.add(new TestPOJOEvent(i, "test" + i));
+    }
+
+    outputOperator.beginWindow(0);
+    for (int i = 0; i < 10; i++) {
+      outputOperator.input.process(events.get(i));
+    }
+    outputOperator.endWindow();
+
+    outputOperator.beginWindow(1);
+    for (int i = 10; i < 20; i++) {
+      outputOperator.input.process(events.get(i));
+    }
+    outputOperator.endWindow();
+
+    outputOperator.beginWindow(2);
+    for (int i = 20; i < 30; i++) {
+      outputOperator.input.process(events.get(i));
+    }
+    outputOperator.endWindow();
+
+    outputOperator.setup(context);
+    outputOperator.input.setup(tpc);
+    outputOperator.activate(context);
+
+    outputOperator.beginWindow(0);
+    for (int i = 30; i < 40; i++) {
+      outputOperator.input.process(events.get(i));
+    }
+    outputOperator.endWindow();
+
+    outputOperator.beginWindow(1);
+    for (int i = 40; i < 50; i++) {
+      outputOperator.input.process(events.get(i));
+    }
+    outputOperator.endWindow();
+
+    outputOperator.beginWindow(2);
+    for (int i = 50; i < 60; i++) {
+      outputOperator.input.process(events.get(i));
+    }
+
+    outputOperator.beginWindow(3);
+    for (int i = 60; i < 70; i++) {
+      outputOperator.input.process(events.get(i));
+    }
+    outputOperator.endWindow();
+
+    outputOperator.deactivate();
+    outputOperator.teardown();
+
+    Assert.assertEquals("rows in db", 40, outputOperator.getNumOfEventsInStore(TABLE_POJO_NAME));
+
+  }
+
+
+  /**
    * This test will assume direct mapping for POJO fields to DB columns Nullable
    * DB field missing in POJO name1 field, which is nullable in DB is missing
    * from POJO POJO(id, name) -> DB(id, name1)