You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by fp...@apache.org on 2016/05/18 19:34:36 UTC

[1/7] incubator-omid git commit: [ci skip]prepare for next development iteration [Forced Update!]

Repository: incubator-omid
Updated Branches:
  refs/heads/master 104697504 -> b77cce396 (forced update)


[ci skip]prepare for next development iteration

Change-Id: Id9f05c50fea98b33be83a934b89bab81c7ad1762


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/a8bee71b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/a8bee71b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/a8bee71b

Branch: refs/heads/master
Commit: a8bee71b112c1eac836618945c11a73422eef096
Parents: 8ff9fff
Author: Igor Katkov <ka...@yahoo-inc.com>
Authored: Thu May 12 12:52:09 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Wed May 18 12:26:23 2016 -0700

----------------------------------------------------------------------
 benchmarks/pom.xml          | 2 +-
 codahale-metrics/pom.xml    | 2 +-
 commit-table/pom.xml        | 2 +-
 common/pom.xml              | 2 +-
 examples/pom.xml            | 2 +-
 hbase-client/pom.xml        | 2 +-
 hbase-commit-table/pom.xml  | 2 +-
 hbase-common/pom.xml        | 2 +-
 hbase-coprocessor/pom.xml   | 2 +-
 hbase-shims/hbase-0/pom.xml | 2 +-
 hbase-shims/hbase-1/pom.xml | 2 +-
 hbase-shims/pom.xml         | 2 +-
 hbase-tools/pom.xml         | 2 +-
 metrics/pom.xml             | 2 +-
 pom.xml                     | 2 +-
 statemachine/pom.xml        | 2 +-
 timestamp-storage/pom.xml   | 2 +-
 transaction-client/pom.xml  | 2 +-
 tso-server/pom.xml          | 2 +-
 19 files changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/benchmarks/pom.xml
----------------------------------------------------------------------
diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml
index 62cce83..e24d342 100644
--- a/benchmarks/pom.xml
+++ b/benchmarks/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.omid</groupId>
         <artifactId>omid</artifactId>
-        <version>0.8.1.53-SNAPSHOT</version>
+        <version>0.8.2.1-SNAPSHOT</version>
     </parent>
 
     <artifactId>benchmarks</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/codahale-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/codahale-metrics/pom.xml b/codahale-metrics/pom.xml
index ba5d560..bafcdc8 100644
--- a/codahale-metrics/pom.xml
+++ b/codahale-metrics/pom.xml
@@ -4,7 +4,7 @@
     <parent>
         <artifactId>omid</artifactId>
         <groupId>org.apache.omid</groupId>
-        <version>0.8.1.53-SNAPSHOT</version>
+        <version>0.8.2.1-SNAPSHOT</version>
     </parent>
 
     <modelVersion>4.0.0</modelVersion>

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/commit-table/pom.xml
----------------------------------------------------------------------
diff --git a/commit-table/pom.xml b/commit-table/pom.xml
index 5cc7be0..d904359 100644
--- a/commit-table/pom.xml
+++ b/commit-table/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.omid</groupId>
         <artifactId>omid</artifactId>
-        <version>0.8.1.53-SNAPSHOT</version>
+        <version>0.8.2.1-SNAPSHOT</version>
     </parent>
 
     <artifactId>commit-table</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 5e84383..b0cbc79 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.omid</groupId>
         <artifactId>omid</artifactId>
-        <version>0.8.1.53-SNAPSHOT</version>
+        <version>0.8.2.1-SNAPSHOT</version>
     </parent>
 
     <artifactId>common</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 4a704cf..fdfb0fa 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -6,7 +6,7 @@
     <parent>
         <artifactId>omid</artifactId>
         <groupId>org.apache.omid</groupId>
-        <version>0.8.1.53-SNAPSHOT</version>
+        <version>0.8.2.1-SNAPSHOT</version>
     </parent>
 
     <artifactId>examples</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/hbase-client/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml
index 01818ba..4e2e55d 100644
--- a/hbase-client/pom.xml
+++ b/hbase-client/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.omid</groupId>
         <artifactId>omid</artifactId>
-        <version>0.8.1.53-SNAPSHOT</version>
+        <version>0.8.2.1-SNAPSHOT</version>
     </parent>
 
     <artifactId>hbase-client</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/hbase-commit-table/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-commit-table/pom.xml b/hbase-commit-table/pom.xml
index c0b5104..4bd7a89 100644
--- a/hbase-commit-table/pom.xml
+++ b/hbase-commit-table/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.omid</groupId>
         <artifactId>omid</artifactId>
-        <version>0.8.1.53-SNAPSHOT</version>
+        <version>0.8.2.1-SNAPSHOT</version>
     </parent>
 
     <artifactId>hbase-commit-table</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/hbase-common/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-common/pom.xml b/hbase-common/pom.xml
index 76a1272..3373702 100644
--- a/hbase-common/pom.xml
+++ b/hbase-common/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.omid</groupId>
         <artifactId>omid</artifactId>
-        <version>0.8.1.53-SNAPSHOT</version>
+        <version>0.8.2.1-SNAPSHOT</version>
     </parent>
 
     <artifactId>hbase-common</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/hbase-coprocessor/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/pom.xml b/hbase-coprocessor/pom.xml
index 7dd7f16..d14d8dc 100644
--- a/hbase-coprocessor/pom.xml
+++ b/hbase-coprocessor/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.omid</groupId>
         <artifactId>omid</artifactId>
-        <version>0.8.1.53-SNAPSHOT</version>
+        <version>0.8.2.1-SNAPSHOT</version>
     </parent>
 
     <artifactId>hbase-coprocessor</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/hbase-shims/hbase-0/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-shims/hbase-0/pom.xml b/hbase-shims/hbase-0/pom.xml
index 84659de..4593485 100644
--- a/hbase-shims/hbase-0/pom.xml
+++ b/hbase-shims/hbase-0/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.omid</groupId>
         <artifactId>omid-shims-aggregator</artifactId>
-        <version>0.8.1.53-SNAPSHOT</version>
+        <version>0.8.2.1-SNAPSHOT</version>
     </parent>
 
     <artifactId>hbase0-shims</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/hbase-shims/hbase-1/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-shims/hbase-1/pom.xml b/hbase-shims/hbase-1/pom.xml
index 924c5c7..e16494e 100644
--- a/hbase-shims/hbase-1/pom.xml
+++ b/hbase-shims/hbase-1/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.omid</groupId>
         <artifactId>omid-shims-aggregator</artifactId>
-        <version>0.8.1.53-SNAPSHOT</version>
+        <version>0.8.2.1-SNAPSHOT</version>
     </parent>
 
     <artifactId>hbase1-shims</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/hbase-shims/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-shims/pom.xml b/hbase-shims/pom.xml
index 77fca87..3d2aa99 100644
--- a/hbase-shims/pom.xml
+++ b/hbase-shims/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.omid</groupId>
         <artifactId>omid</artifactId>
-        <version>0.8.1.53-SNAPSHOT</version>
+        <version>0.8.2.1-SNAPSHOT</version>
     </parent>
 
     <artifactId>omid-shims-aggregator</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/hbase-tools/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-tools/pom.xml b/hbase-tools/pom.xml
index 762b021..7c57978 100644
--- a/hbase-tools/pom.xml
+++ b/hbase-tools/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.omid</groupId>
         <artifactId>omid</artifactId>
-        <version>0.8.1.53-SNAPSHOT</version>
+        <version>0.8.2.1-SNAPSHOT</version>
     </parent>
 
     <artifactId>hbase-tools</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/metrics/pom.xml
----------------------------------------------------------------------
diff --git a/metrics/pom.xml b/metrics/pom.xml
index a22d924..577b699 100644
--- a/metrics/pom.xml
+++ b/metrics/pom.xml
@@ -6,7 +6,7 @@
     <parent>
         <artifactId>omid</artifactId>
         <groupId>org.apache.omid</groupId>
-        <version>0.8.1.53-SNAPSHOT</version>
+        <version>0.8.2.1-SNAPSHOT</version>
     </parent>
 
     <artifactId>metrics</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 652df9c..d6137e9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -13,7 +13,7 @@
     <artifactId>omid</artifactId>
     <packaging>pom</packaging>
     <!-- WARNING: do not update version manually, use mvn versions:set -->
-    <version>0.8.1.53-SNAPSHOT</version>
+    <version>0.8.2.1-SNAPSHOT</version>
 
     <organization>
         <name>Apache Software Foundation</name>

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/statemachine/pom.xml
----------------------------------------------------------------------
diff --git a/statemachine/pom.xml b/statemachine/pom.xml
index e5c47d6..3cf875b 100644
--- a/statemachine/pom.xml
+++ b/statemachine/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.omid</groupId>
         <artifactId>omid</artifactId>
-        <version>0.8.1.53-SNAPSHOT</version>
+        <version>0.8.2.1-SNAPSHOT</version>
     </parent>
 
     <name>State Machine</name>

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/timestamp-storage/pom.xml
----------------------------------------------------------------------
diff --git a/timestamp-storage/pom.xml b/timestamp-storage/pom.xml
index 03b07b5..f0d777a 100644
--- a/timestamp-storage/pom.xml
+++ b/timestamp-storage/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.omid</groupId>
         <artifactId>omid</artifactId>
-        <version>0.8.1.53-SNAPSHOT</version>
+        <version>0.8.2.1-SNAPSHOT</version>
     </parent>
 
     <artifactId>timestamp-storage</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/transaction-client/pom.xml
----------------------------------------------------------------------
diff --git a/transaction-client/pom.xml b/transaction-client/pom.xml
index fe038c4..2be9e67 100644
--- a/transaction-client/pom.xml
+++ b/transaction-client/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.omid</groupId>
         <artifactId>omid</artifactId>
-        <version>0.8.1.53-SNAPSHOT</version>
+        <version>0.8.2.1-SNAPSHOT</version>
     </parent>
 
     <artifactId>transaction-client</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/tso-server/pom.xml
----------------------------------------------------------------------
diff --git a/tso-server/pom.xml b/tso-server/pom.xml
index a7dfdf5..86800e5 100644
--- a/tso-server/pom.xml
+++ b/tso-server/pom.xml
@@ -5,7 +5,7 @@
     <parent>
         <groupId>org.apache.omid</groupId>
         <artifactId>omid</artifactId>
-        <version>0.8.1.53-SNAPSHOT</version>
+        <version>0.8.2.1-SNAPSHOT</version>
     </parent>
 
     <artifactId>tso-server</artifactId>


[7/7] incubator-omid git commit: [ci skip] Add 4096 keys in KEYS

Posted by fp...@apache.org.
[ci skip] Add 4096 keys in KEYS

Change-Id: I4d97845d423f9b15f7b1ce223e70f3c756dd34c3


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/b77cce39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/b77cce39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/b77cce39

Branch: refs/heads/master
Commit: b77cce3960151086c53b1e58f4c01c42cc75cd68
Parents: fb63cf3
Author: Igor Katkov <ka...@yahoo-inc.com>
Authored: Wed May 18 12:01:31 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Wed May 18 12:32:48 2016 -0700

----------------------------------------------------------------------
 KEYS | 113 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 113 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/b77cce39/KEYS
----------------------------------------------------------------------
diff --git a/KEYS b/KEYS
new file mode 100644
index 0000000..9142a46
--- /dev/null
+++ b/KEYS
@@ -0,0 +1,113 @@
+This file contains the GPG keys of Apache Omid developers.
+
+Users:    gpg --import KEYS
+
+Developers:
+  Create a key:
+    gpg --gen-key
+
+  Adding you key to this file:
+    (gpg --fingerprint <key id> && gpg --armor --export <key id>) >> this file.
+
+  Publish the key:
+    gpg --keyserver pgp.mit.edu --send-keys <key id>
+
+  Signing another developers key:
+    gpg --keyserver pgp.mit.edu --search-keys <name or email>
+    gpg --keyserver pgp.mit.edu --recv-keys <key id>
+    gpg --sign-key <key id>
+    gpg --keyserver pgp.mit.edu --send-keys <key id>
+
+  Additional Information:
+    http://www.apache.org/dev/openpgp.html#generate-key
+
+********************************* PLEASE NOTE **********************************
+
+  Releases will be signed using one of these keys in this file. This file will
+  be available with the distributed Apache Omid releases at:
+
+      https://dist.apache.org/repos/dist/release/incubator/omid/KEYS
+
+********************************************************************************
+
+pub   4096R/7DDE0136 2016-05-13
+      Key fingerprint = 719B 6514 E694 433F 704E  221C 6407 EEB3 7DDE 0136
+      uid       [ultimate] Igor Katkov <ik...@apache.org>
+      sub   4096R/ED70F1F3 2016-05-13
+
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v2
+
+mQENBFc0/q4BCADC/WsQwFK9hFAsE2a2jR1hffUMAPOz+yu2r0c7UyRyCB6Ti+2i
+m7eHNrml0piad7JVxkw6yyPMxL3P7jH9emWCFTmhGenAtLrbRn8nv9CKKxHTCdkY
+lp2HRFcXqSybr9OiuxELKMxtatpzfNkhBLNpVtYjyNJSPU51n7aW/SeJwaaPC+22
+7TajGB5SbUrSSEXlanpIzXD1AFzrUCZ/EQBSAfQsIWtum4tiCZt7KonOgCkMrshR
+qmmAof1SXYSIGN8hOEkVW/GaFLqELKndLG02QbcNtf/ASlw6K4cWXBvJ1ddokZk8
+SdxCj22quyerrxJiJ5M3Fo6uwkBoBzh+bbZXABEBAAG0IElnb3IgS2F0a292IDxp
+a2F0a292QGFwYWNoZS5vcmc+iQE4BBMBAgAiBQJXNP6uAhsDBgsJCAcDAgYVCAIJ
+CgsEFgIDAQIeAQIXgAAKCRDg1uSyxqugq1NzB/0dlItBpTydQB7RhMBqzbpP1QXr
+MHtgdQDJBckau6IP0ITTXompLsxJCbH4hrfecpUyF1KGWq5lU+KAQNkL9uydlzrx
+eejbyz0uOpuquWSPfj93zoFgSFrfNnCPQNTRvbLKg/YpO3z+CkDcWlPW/Zob7k8Y
+jAHpzVP0bc21Vvu3gmIBUrttHlUtogjU/gP2EtoWs1YO3vqZEUAQG0PbiWGvqBop
+ki7xGyadwbmQJfeaxCkhHunChBsJSl/ev/wowwMSXEXyzBjzb7iJDZmtjWzdat5x
+d8hHGgEWDf4mPKhw2aIaLEkIUCaBiE+sYvPLe/cIMmoQPZ7V0T8Cq12EXdjhuQEN
+BFc0/q4BCADGx5W0e53RmbqJh2K7w6JqNtFEC6s1wD9IixnxRbUQHqL4lduUBdcW
+R4umSp1o3SK7rA2QnKHarNNY+x9AKtUxC/wGH59yv6omlE9HnAudVReGh5hvIip2
+KDzvFbBPfgJR6jI+RVdCf9NJjm1HQVFV20BK5XilkwYYeK7C7ioH27FvWSWWeVDj
+Y022yT0QAEa9NRUWIAE/YZleazbZ/0O++NRm2fiBD4592/WvzcCqvtsNRDzUfg5c
+wY0p7oiqTYl0lm9bsGLxV7hD0crIVTm+qPIIDDhgzSfJWke1sFZGAD9s+WwJ/EuU
+hPPO2iFxmn3QcpmLOEKAbPXTOVZ3FmyLABEBAAGJAR8EGAECAAkFAlc0/q4CGwwA
+CgkQ4NbkssaroKstGgf/fHFUm6diXe4MOLLBNpxGWrG+6rc3v42v/QzObkQAZSyu
+1ml4RfunzWQjNN3IoIQ6yvSrLM6gFavTT+uhu/2y6HMiSqNyBBMixVKUoOpd/HHh
+lHtz72kyYeDlaNKre4IUvA7Y+9sdkM1t2YyE6AuGWYF/55R699112AqH4B5WHSzj
+rgglzECeZmG88Y9V020axhmpzOC3HdiQOujZHCq8Gvc58nbxaf8wsvZZ59E3YjUD
+M4eOr1YZ6BQnrxy0jNnKPwXL+lI68S38Fy9IaGEeXH4Rv03qmeaeAucvyTMzB9Kw
++Cd1v5MmlWKAPPC1g8TmnGGoR+9ptWWL35D48WlwsJkCDQRXNlXvARAA20nu3iVj
+yZPlyrMmimEvtPPnSSM/dURdQG2Gs6E6XvU7F8r2BkzH6PImIYxAWlGeyesaPymh
+NfMxG7Sz5FiLKtlFpK/iVKqEdXxP6U57LVleR3hisT0rO/C4CZ53AdwF9YhYz625
+zDEq4AkX94q74RC2pz7AA2LgLunYuwdns31SoRL/fnfJnqOqEdiyS5n395pzSKaD
+fbcLaq+81wQQtmkQNv3B/JmJsjkx5WKAfweImuRERDrbOpmWW8hHLDZ8etberG8T
+aIAkL4sIQTmZ+EGwkmLUBgWHmxDwMlJW7ECm+qDxvvt42onQUHsMVBbAORSRGdtN
+omO0j/InKeHh6EZbsGiuUqM5/oJJTQd+qyp0icOAjYzbjF9TA22ChfqVjn8TrFbf
+97G7XFIsxdrv9OdmtqzsBk4Ds3H2/mh0d7HUm2VDQ4nsDxevenY7MSLDhNCjE8wX
+mgpmxsm4GXVY2yLIaV6wi1wzP6XghIqS/jXpL4ymTF8s1PoEG+a8mG2QjDVu0W2E
+TbqMGAjvnQTdwVkRjkFPueeGssn0F1yeevuP6eaxGD4youFzwivQc7jsqXonUF0A
+Uop8qQzlp9JLk3qPvHHtDOmtsrR0TxteGVzrJ1m54LuYEqYRbIM3KxOVjphIdR79
+pA7s9pfXyhINty8dlWoxJaC46i24w3BapT0AEQEAAbQgSWdvciBLYXRrb3YgPGlr
+YXRrb3ZAYXBhY2hlLm9yZz6JAjkEEwEIACMFAlc2Ve8CGwMHCwkIBwMCAQYVCAIJ
+CgsEFgIDAQIeAQIXgAAKCRBkB+6zfd4BNmJUD/9K5Ktqm55epkW47ithg9Sk1o6/
+AAqMp56aU7xJRzeHRqsg3n7Z+1nP2LYFZo+so3NJ9RH40xPBD4UF8W2fu+Ks04Zg
+y5IhJuSYxyFzVGulCsiSFVQsAmb4qqBHMFvXmi7gt7W6f7tYvwbqY/RT7uWRmg5K
+HlH3WtBYwoBJ541eNE9WCScQ16R/+CQMoYzCYRf1woKyGe5wqu2SEShh8T/Qdn3n
+jjJc1Y1JTSYI/KobLVkvVop0Z9gG+yzj/4boiik/SHm8D/ZM4Xlx5x3Ju5foqHDT
+n6p1NRdDY8ryE656opxKa7fsTD8t1SUHgpX/zH10YLVwhUcsTJRyHZhcKkgrzohz
+FGAAGPsEhHXO15A7vAGxKIkCgWuN04JITsYAOzTiXnzMhMnDn9zi0tz1dw2Bf+i9
+sbBbLxrHTCarj90tRtcgn8hiFTpBgEMySMjaPUWt1RZeesAN60yKr3qWwDa65ZED
+mIIGJ0l1Dfz1LJuLyCjDEQKC38yhgrVttYIfzbn/9V95vz+xOWauCeSszF497zP4
+Gx8Q7w9SzmYwT1cv4ZsHEEHl1ryF59mOwfS/ZzYjs4qksQaSB53Ah1BvJ3Qxmpwe
+bA8UegF16uFAdJFsG8158nA9YPNrHXFKc2ltEd5EHacesE1FG2J1ZpSg4Bcgjb7N
+1zNJnvzv8vfKnQAuX7kCDQRXNlXvARAAzc7hecGPv1Zgph/8/PSCX0geWat5i19Q
+exgW9+lZ4TSO0Nr/TO7kShXRsMX6YQR5lEPPGZQqvAyyZTNcB5IjnCyamiarhgGd
+PRqvWqkUCsvwPTFe/OKUx2BWuze0ivE4V5x2lhZansEusT+E3KfdV5dU5eDN/2gl
+s+TZ7yX0X1lxjWM1UCipBo5QKGaViodBYGJKYdZcNDtfJq64hsG+N1/OBoA6+jwW
+41wmv1N3TZYPzrr08Wv3vm3rTlXQMMBZGKluKxzboOtPbmLwu4LwYw5L0UrN18Qj
+kUk8eECYHk38eGrZDhRrvdG4bNGOVMYMUajhrD41SsB0Vk5mZK7ETYWbEnRJc41c
+y4p2Y69+lMdbqKTJ7mPw1+co3YqLfQJePqryBskrHmhJWELBtxybidh8VaUz5i0w
+qD+OPA0mnDX8zkuwl3UV6x7vTQql35iGCaDGnyNN5bIVFfxQF7x9mtZXQtf1nsQx
+I8A1lNvIXIe1lR8MPH2RavpQrEl0GzC1DK23zRviPBHyf7IezjF+B/8IzrlXD19u
+7sJ6YlZyaHlHUqZXJPUVKbM/AqSfbKpXmYeWXuIFaMOrF2ZWWt9V/r/duGZk1C69
+5X8uqHvfjuVZxE1bh2Xilo2h0adQFBZ3jBAPHDPh0GAppIMOhY7z91rUHNfnkobv
+mVUg7eU7apsAEQEAAYkCHwQYAQgACQUCVzZV7wIbDAAKCRBkB+6zfd4BNgu8D/sH
++1umPcnjGOXtD2W6FX6t3C3TzSOJ0GZv5ycv/9OKTBFw2HCwtLYWcbwY60Jgo8h5
+syLYfYGj5tQwtSLKx/6MtNPTK+JFK7++/OikQ7pFCeZvphP22JBs7sWfuR6TK/kJ
+NSNwNUoICBvKY03hnma4XVUHgadXJ9pfHZ49TaC+/ZkKYh2dLbNmVBHSI5rcjpD7
+YY6m+DjVXbDjr6mA8okcF/IpNuEs4l1BEy0YkbxR37UrPt9CBYOQwq+9j7MymoaQ
+75ZGZPhL+e44vwUVCPCvsdwcMhEtmAg5VbZ95q/eE201bKnIV/7ipkhlRs7iyTqS
+KnVeCcnowTFUQi3SpEuxK6gcm8mN28Nq2RpOdy5KV8GlVnN1wu5quWYw0YsjO+bc
+9d7Pyfb7inCzwWysG1z9H9vFLBbAnGhanwyeLPoyoVDjN+1egqesa6e8AQ8nIN1g
+oW4oidKktQkX8ca5LriWgS8/6H0S1C08SIhom11R3eA1sfJ6OabSomAKBVXlNRsq
+6ZPs31DelnPFlnCwF3d3PeL720sPGsaCdeChTFqWYbbferHCKoEdmYcrqVAxsG2q
+Io2Cs9jOV2iU4yYf3dJgUiBR/Vq4Dt0AqNzPTNojBAuAUOP+d63+ojF9Qp/VqUKj
+GfBNQDqZTcwRAan7+TJD21r9uaJAIXYr/SQX8z0bbw==
+=oZEC
+-----END PGP PUBLIC KEY BLOCK-----


[3/7] incubator-omid git commit: Remove IOException from TimestampOracle intf

Posted by fp...@apache.org.
Remove IOException from TimestampOracle intf

The unnecessary exception introduced a several unnecessary boilerplate code.
Also the TimestampOracle interface has been commented properly.

Change-Id: I510e7677f07299061aebfd23f7a8d189e177a4f1


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/484d116a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/484d116a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/484d116a

Branch: refs/heads/master
Commit: 484d116a50324f423b91514a6b4436cc90a13f05
Parents: a8bee71
Author: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Authored: Thu May 12 10:37:25 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Wed May 18 12:26:31 2016 -0700

----------------------------------------------------------------------
 .../apache/omid/tso/RequestProcessorImpl.java   | 40 ++++++++------------
 .../org/apache/omid/tso/TimestampOracle.java    | 16 ++++++--
 .../apache/omid/tso/TimestampOracleImpl.java    |  7 ++--
 .../omid/tso/PausableTimestampOracle.java       |  2 +-
 .../java/org/apache/omid/tso/TestPanicker.java  |  8 +---
 .../apache/omid/tso/TestTimestampOracle.java    |  8 +---
 6 files changed, 36 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/484d116a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
index d101c72..0ad5c08 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
@@ -154,15 +154,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
 
     private void handleTimestamp(RequestEvent requestEvent) throws Exception {
 
-        long timestamp;
-
-        try {
-            timestamp = timestampOracle.next();
-        } catch (IOException e) {
-            LOG.error("Error getting timestamp", e);
-            return;
-        }
-
+        long timestamp = timestampOracle.next();
         persistProc.addTimestampToBatch(timestamp, requestEvent.getChannel(), requestEvent.getMonCtx());
 
     }
@@ -195,27 +187,25 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
 
         if (txCanCommit) {
             // 2. commit
-            try {
-                long commitTimestamp = timestampOracle.next();
 
-                if (numCellsInWriteset > 0) {
-                    long newLowWatermark = lowWatermark;
+            long commitTimestamp = timestampOracle.next();
 
-                    for (long r : writeSet) {
-                        long removed = hashmap.putLatestWriteForCell(r, commitTimestamp);
-                        newLowWatermark = Math.max(removed, newLowWatermark);
-                    }
+            if (numCellsInWriteset > 0) {
+                long newLowWatermark = lowWatermark;
 
-                    if (newLowWatermark != lowWatermark) {
-                        LOG.trace("Setting new low Watermark to {}", newLowWatermark);
-                        lowWatermark = newLowWatermark;
-                        persistProc.persistLowWatermark(newLowWatermark); // Async persist
-                    }
+                for (long r : writeSet) {
+                    long removed = hashmap.putLatestWriteForCell(r, commitTimestamp);
+                    newLowWatermark = Math.max(removed, newLowWatermark);
+                }
+
+                if (newLowWatermark != lowWatermark) {
+                    LOG.trace("Setting new low Watermark to {}", newLowWatermark);
+                    lowWatermark = newLowWatermark;
+                    persistProc.persistLowWatermark(newLowWatermark); // Async persist
                 }
-                persistProc.addCommitToBatch(startTimestamp, commitTimestamp, c, event.getMonCtx());
-            } catch (IOException e) {
-                LOG.error("Error committing tx {}", startTimestamp, e);
             }
+            persistProc.addCommitToBatch(startTimestamp, commitTimestamp, c, event.getMonCtx());
+
         } else { // add it to the aborted list
             persistProc.addAbortToBatch(startTimestamp, isRetry, c, event.getMonCtx());
         }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/484d116a/tso-server/src/main/java/org/apache/omid/tso/TimestampOracle.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TimestampOracle.java b/tso-server/src/main/java/org/apache/omid/tso/TimestampOracle.java
index f6c6127..3b795ef 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TimestampOracle.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TimestampOracle.java
@@ -19,16 +19,26 @@ package org.apache.omid.tso;
 
 import java.io.IOException;
 
+/**
+ * Functionality of a service delivering monotonic increasing timestamps.
+ */
 public interface TimestampOracle {
 
+    /**
+     * Allows the initialization of the Timestamp Oracle service.
+     * @throws IOException
+     *          raised if a problem during initialization is shown.
+     */
     void initialize() throws IOException;
 
     /**
-     * Returns the next timestamp if available. Otherwise spins till the
-     * ts-persist thread performs the new timestamp allocation
+     * Returns the next timestamp.
      */
-    long next() throws IOException;
+    long next();
 
+    /**
+     * Returns the last timestamp assigned.
+     */
     long getLast();
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/484d116a/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java b/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
index b5bbe4b..0a65c01 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
@@ -34,7 +34,7 @@ import java.util.concurrent.Executors;
 import static org.apache.omid.metrics.MetricsUtils.name;
 
 /**
- * The Timestamp Oracle that gives monotonically increasing timestamps
+ * The Timestamp Oracle that gives monotonically increasing timestamps.
  */
 @Singleton
 public class TimestampOracleImpl implements TimestampOracle {
@@ -129,12 +129,11 @@ public class TimestampOracleImpl implements TimestampOracle {
     }
 
     /**
-     * Returns the next timestamp if available. Otherwise spins till the
-     * ts-persist thread performs the new timestamp allocation
+     * Returns the next timestamp if available. Otherwise spins till the ts-persist thread allocates a new timestamp.
      */
     @SuppressWarnings("StatementWithEmptyBody")
     @Override
-    public long next() throws IOException {
+    public long next() {
         lastTimestamp++;
 
         if (lastTimestamp == nextAllocationThreshold) {

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/484d116a/tso-server/src/test/java/org/apache/omid/tso/PausableTimestampOracle.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/PausableTimestampOracle.java b/tso-server/src/test/java/org/apache/omid/tso/PausableTimestampOracle.java
index 360bbe2..588acfd 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/PausableTimestampOracle.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/PausableTimestampOracle.java
@@ -39,7 +39,7 @@ public class PausableTimestampOracle extends TimestampOracleImpl {
     }
 
     @Override
-    public long next() throws IOException {
+    public long next() {
         while (tsoPaused) {
             synchronized (this) {
                 try {

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/484d116a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
index 2964388..9b8ad29 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
@@ -77,12 +77,8 @@ public class TestPanicker {
         Thread allocThread = new Thread("AllocThread") {
             @Override
             public void run() {
-                try {
-                    while (true) {
-                        tso.next();
-                    }
-                } catch (IOException ioe) {
-                    LOG.error("Shouldn't occur");
+                while (true) {
+                    tso.next();
                 }
             }
         };

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/484d116a/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java b/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
index 501581b..c75e95b 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
@@ -97,12 +97,8 @@ public class TestTimestampOracle {
         Thread allocThread = new Thread("AllocThread") {
             @Override
             public void run() {
-                try {
-                    while (true) {
-                        timestampOracle.next();
-                    }
-                } catch (IOException ioe) {
-                    LOG.error("Shouldn't occur");
+                while (true) {
+                    timestampOracle.next();
                 }
             }
         };


[4/7] incubator-omid git commit: Add test battery for batch re-ordering functionality in reply processor

Posted by fp...@apache.org.
Add test battery for batch re-ordering functionality in reply processor

Change-Id: I7105b1698ee4d1095baabdf844928518da57a4a7


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/fcc56bbc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/fcc56bbc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/fcc56bbc

Branch: refs/heads/master
Commit: fcc56bbc3ac376b8180351e5d7e1890de627acc9
Parents: 8954a34
Author: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Authored: Mon May 16 15:10:22 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Wed May 18 12:26:31 2016 -0700

----------------------------------------------------------------------
 .../org/apache/omid/tso/TestReplyProcessor.java | 262 +++++++++++++++++++
 1 file changed, 262 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/fcc56bbc/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
new file mode 100644
index 0000000..d7bebd6
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import org.apache.commons.pool2.ObjectPool;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.metrics.NullMetricsProvider;
+import org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent;
+import org.apache.omid.tso.ReplyProcessorImpl.ReplyBatchEvent;
+import org.jboss.netty.channel.Channel;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertEqualsNoOrder;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+public class TestReplyProcessor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestReplyProcessor.class);
+
+    private static final long ANY_DISRUPTOR_SEQUENCE = 1234L;
+
+    public static final int BATCH_POOL_SIZE = 3;
+
+    private static final long FIRST_ST = 0L;
+    private static final long FIRST_CT = 1L;
+    private static final long SECOND_ST = 2L;
+    private static final long SECOND_CT = 3L;
+    private static final long THIRD_ST = 4L;
+    private static final long THIRD_CT = 5L;
+    private static final long FOURTH_ST = 6L;
+    private static final long FOURTH_CT = 7L;
+    private static final long FIFTH_ST = 8L;
+    private static final long FIFTH_CT = 9L;
+    private static final long SIXTH_ST = 10L;
+
+    @Mock
+    private Panicker panicker;
+
+    private MetricsRegistry metrics;
+
+    private ObjectPool<Batch> batchPool;
+
+    // Component under test
+    private ReplyProcessorImpl replyProcessor;
+
+    @BeforeMethod(alwaysRun = true, timeOut = 30_000)
+    public void initMocksAndComponents() throws Exception {
+
+        MockitoAnnotations.initMocks(this);
+
+        TSOServerConfig tsoConfig = new TSOServerConfig();
+        tsoConfig.setNumConcurrentCTWriters(BATCH_POOL_SIZE);
+
+        // Configure null metrics provider
+        metrics = new NullMetricsProvider();
+
+        batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
+
+        replyProcessor = spy(new ReplyProcessorImpl(metrics, panicker, batchPool));
+
+    }
+
+    @AfterMethod
+    void afterMethod() {
+    }
+
+    @Test(timeOut = 10_000)
+    public void testBadFormedPackageThrowsException() throws Exception {
+
+        // We need an instance throwing exceptions for this test
+        replyProcessor = spy(new ReplyProcessorImpl(metrics, new RuntimeExceptionPanicker(), batchPool));
+
+        // Prepare test batch
+        Batch batch = batchPool.borrowObject();
+        batch.addCommitRetry(FIRST_ST, null, null  );
+        ReplyBatchEvent e = ReplyBatchEvent.EVENT_FACTORY.newInstance();
+        ReplyBatchEvent.makeReplyBatch(e, batch, 0);
+
+        assertEquals(replyProcessor.nextIDToHandle.get(), 0);
+        assertEquals(replyProcessor.futureEvents.size(), 0);
+        assertEquals(batchPool.getNumActive(), 1);
+        assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
+
+        try {
+            replyProcessor.onEvent(e, ANY_DISRUPTOR_SEQUENCE, false);
+            fail();
+        } catch(RuntimeException re) {
+            // Expected
+        }
+
+        assertEquals(replyProcessor.nextIDToHandle.get(), 0);
+        assertEquals(replyProcessor.futureEvents.size(), 0);
+        assertEquals(batchPool.getNumActive(), 1);
+        assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testUnorderedBatchSequenceGetsSaved() throws Exception {
+
+        final long HIGH_SEQUENCE_NUMBER = 1234L; // Should be greater than 0
+
+        // Prepare test batch
+        Batch batch = batchPool.borrowObject();
+        ReplyBatchEvent e = ReplyBatchEvent.EVENT_FACTORY.newInstance();
+        ReplyBatchEvent.makeReplyBatch(e, batch, HIGH_SEQUENCE_NUMBER);
+
+        assertEquals(replyProcessor.nextIDToHandle.get(), 0);
+        assertEquals(replyProcessor.futureEvents.size(), 0);
+        assertEquals(batchPool.getNumActive(), 1);
+        assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
+
+        replyProcessor.onEvent(e, ANY_DISRUPTOR_SEQUENCE, false);
+
+        assertEquals(replyProcessor.nextIDToHandle.get(), 0);
+        assertEquals(replyProcessor.futureEvents.size(), 1);
+        assertEquals(batchPool.getNumActive(), 1);
+        assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
+        assertTrue(batch.isEmpty());
+        verify(replyProcessor, times(0)).handleReplyBatchEvent(any(ReplyBatchEvent.class));
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testProcessingOfEmptyBatchReplyEvent() throws Exception {
+
+        // Prepare test batch
+        Batch batch = batchPool.borrowObject();
+        ReplyBatchEvent e = ReplyBatchEvent.EVENT_FACTORY.newInstance();
+        ReplyBatchEvent.makeReplyBatch(e, batch, 0);
+
+        assertEquals(replyProcessor.nextIDToHandle.get(), 0);
+        assertEquals(replyProcessor.futureEvents.size(), 0);
+        assertEquals(batchPool.getNumActive(), 1);
+        assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
+
+        replyProcessor.onEvent(e, ANY_DISRUPTOR_SEQUENCE, false);
+
+        assertEquals(replyProcessor.nextIDToHandle.get(), 1);
+        assertEquals(replyProcessor.futureEvents.size(), 0);
+        assertEquals(batchPool.getNumActive(), 0);
+        assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE);
+        assertTrue(batch.isEmpty());
+        verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(e));
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testUnorderedSequenceOfBatchReplyEventsThatMustBeOrderedBeforeSendingReplies() throws Exception {
+
+        // Prepare 3 batches with events and simulate a different order of arrival using the batch sequence
+
+        // Prepare first a delayed batch (Batch #3)
+        Batch thirdBatch = batchPool.borrowObject();
+        thirdBatch.addTimestamp(FIRST_ST, mock(Channel.class), mock(MonitoringContext.class));
+        thirdBatch.addCommit(SECOND_ST, SECOND_CT, mock(Channel.class), mock(MonitoringContext.class));
+        ReplyBatchEvent thirdBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
+        ReplyBatchEvent.makeReplyBatch(thirdBatchEvent, thirdBatch, 2); // Set a higher sequence than the initial one
+
+        assertEquals(replyProcessor.nextIDToHandle.get(), 0);
+        assertEquals(replyProcessor.futureEvents.size(), 0);
+        assertEquals(batchPool.getNumActive(), 1);
+        assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
+
+        replyProcessor.onEvent(thirdBatchEvent, ANY_DISRUPTOR_SEQUENCE, false);
+
+        assertEquals(replyProcessor.nextIDToHandle.get(), 0);
+        assertEquals(replyProcessor.futureEvents.size(), 1);
+        assertEquals(batchPool.getNumActive(), 1);
+        assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
+        assertFalse(thirdBatch.isEmpty());
+        verify(replyProcessor, never()).handleReplyBatchEvent(eq(thirdBatchEvent));
+
+        // Prepare another delayed batch (Batch #2)
+        Batch secondBatch = batchPool.borrowObject();
+        secondBatch.addTimestamp(THIRD_ST, mock(Channel.class), mock(MonitoringContext.class));
+        secondBatch.addCommit(FOURTH_ST, FOURTH_CT, mock(Channel.class), mock(MonitoringContext.class));
+        ReplyBatchEvent secondBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
+        ReplyBatchEvent.makeReplyBatch(secondBatchEvent, secondBatch, 1); // Set another higher sequence
+
+        replyProcessor.onEvent(secondBatchEvent, ANY_DISRUPTOR_SEQUENCE, false);
+
+        assertEquals(replyProcessor.nextIDToHandle.get(), 0);
+        assertEquals(replyProcessor.futureEvents.size(), 2);
+        assertEquals(batchPool.getNumActive(), 2);
+        assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 2);
+        assertFalse(secondBatch.isEmpty());
+        assertFalse(thirdBatch.isEmpty());
+
+        // Finally, prepare the batch that should trigger the execution of the other two
+        Batch firstBatch = batchPool.borrowObject();
+        firstBatch.addAbort(FIFTH_ST, mock(Channel.class), mock(MonitoringContext.class));
+        ReplyBatchEvent firstBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
+        ReplyBatchEvent.makeReplyBatch(firstBatchEvent, firstBatch, 0); // Set the first batch with a higher sequence
+
+        replyProcessor.onEvent(firstBatchEvent, ANY_DISRUPTOR_SEQUENCE, false);
+
+        assertEquals(replyProcessor.nextIDToHandle.get(), 3);
+        assertEquals(replyProcessor.futureEvents.size(), 0);
+        assertEquals(batchPool.getNumActive(), 0);
+        assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE);
+        assertTrue(firstBatch.isEmpty());
+        assertTrue(secondBatch.isEmpty());
+        assertTrue(thirdBatch.isEmpty());
+
+        // Check the method calls have been properly ordered
+
+        InOrder inOrderReplyBatchEvents = inOrder(replyProcessor, replyProcessor, replyProcessor);
+        inOrderReplyBatchEvents.verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(firstBatchEvent));
+        inOrderReplyBatchEvents.verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(secondBatchEvent));
+        inOrderReplyBatchEvents.verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(thirdBatchEvent));
+
+        InOrder inOrderReplies = inOrder(replyProcessor, replyProcessor, replyProcessor, replyProcessor, replyProcessor);
+        inOrderReplies.verify(replyProcessor, times(1)).sendAbortResponse(eq(FIFTH_ST), any(Channel.class));
+        inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(THIRD_ST), any(Channel.class));
+        inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(FOURTH_ST), eq(FOURTH_CT), any(Channel.class));
+        inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(FIRST_ST), any(Channel.class));
+        inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(SECOND_ST), eq(SECOND_CT), any(Channel.class));
+
+    }
+
+}


[5/7] incubator-omid git commit: Fix bug in retry request management that didn't return batches to pool

Posted by fp...@apache.org.
Fix bug in retry request management that didn't return batches to pool

Add new complete tests for PersistentProcessorHandler component.
Also improve readability.

Change-Id: Ied72d420bb229c3466b7b89cf14b0756c93b2e31


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/3659b96c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/3659b96c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/3659b96c

Branch: refs/heads/master
Commit: 3659b96c5a775bd0f9bfbf9b65a67e5e7f0c6da2
Parents: 484d116
Author: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Authored: Fri May 13 14:08:09 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Wed May 18 12:26:31 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/omid/tso/Batch.java    |  49 +-
 .../java/org/apache/omid/tso/PersistEvent.java  |  12 +-
 .../apache/omid/tso/PersistenceProcessor.java   |   2 +-
 .../omid/tso/PersistenceProcessorHandler.java   | 107 ++--
 .../omid/tso/PersistenceProcessorImpl.java      |   4 +-
 .../org/apache/omid/tso/ReplyProcessorImpl.java |  39 +-
 .../apache/omid/tso/RequestProcessorImpl.java   |  13 +-
 .../tso/TestPersistenceProcessorHandler.java    | 488 +++++++++++++++++++
 8 files changed, 607 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/main/java/org/apache/omid/tso/Batch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/Batch.java b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
index c5ed696..06ddc71 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/Batch.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
@@ -35,11 +35,11 @@ public class Batch {
     private final int id;
     private final int size;
     private int numEvents;
-    private final PersistEvent[] events;
+    private final PersistEvent[] events; // TODO Check if it's worth to have a dynamic structure for this
 
     Batch(int id, int size) {
 
-        Preconditions.checkArgument(size > 0, "Size must be positive");
+        Preconditions.checkArgument(size > 0, "Size [%s] must be positive", size);
         this.size = size;
         this.id = id;
         this.numEvents = 0;
@@ -51,41 +51,46 @@ public class Batch {
 
     }
 
-    boolean isFull() {
-
-        Preconditions.checkState(numEvents <= size, "numEvents > size");
-        return numEvents == size;
+    PersistEvent get(int idx) {
+        Preconditions.checkState(numEvents > 0 && 0 <= idx && idx < numEvents,
+                                 "Accessing Events array (Size = %s) with wrong index [%s]", numEvents, idx);
+        return events[idx];
+    }
 
+    void set(int idx, PersistEvent event) {
+        Preconditions.checkState(0 <= idx && idx < numEvents);
+        events[idx] = event;
     }
 
-    boolean isEmpty() {
+    void clear() {
 
-        return numEvents == 0;
+        numEvents = 0;
 
     }
 
+    void decreaseNumEvents() {
+        numEvents--;
+    }
+
     int getNumEvents() {
         return numEvents;
     }
 
-    void clear() {
+    int getLastEventIdx() {
+        return numEvents - 1;
+    }
 
-        numEvents = 0;
+    boolean isFull() {
 
-    }
+        Preconditions.checkState(numEvents <= size, "Batch Full: numEvents [%s] > size [%s]", numEvents, size);
+        return numEvents == size;
 
-    PersistEvent get(int idx) {
-        Preconditions.checkState(numEvents > 0 && 0 <= idx && idx < numEvents);
-        return events[idx];
     }
 
-    void set(int idx, PersistEvent event) {
-        Preconditions.checkState(0 <= idx && idx < numEvents);
-        events[idx] = event;
-    }
+    boolean isEmpty() {
+
+        return numEvents == 0;
 
-    void decreaseNumEvents() {
-        numEvents--;
     }
 
     void addCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext context) {
@@ -96,11 +101,11 @@ public class Batch {
 
     }
 
-    void addAbort(long startTimestamp, boolean isRetry, Channel c, MonitoringContext context) {
+    void addAbort(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext context) {
         Preconditions.checkState(!isFull(), "batch is full");
         int index = numEvents++;
         PersistEvent e = events[index];
-        e.makePersistAbort(startTimestamp, isRetry, c, context);
+        e.makePersistAbort(startTimestamp, isCommitRetry, c, context);
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
index 8d4fd85..9bf9e89 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
@@ -31,7 +31,7 @@ public final class PersistEvent {
     private Type type = null;
     private Channel channel = null;
 
-    private boolean isRetry = false;
+    private boolean isCommitRetry = false;
     private long startTimestamp = 0L;
     private long commitTimestamp = 0L;
     private long lowWatermark = 0L;
@@ -46,11 +46,11 @@ public final class PersistEvent {
 
     }
 
-    void makePersistAbort(long startTimestamp, boolean isRetry, Channel c, MonitoringContext monCtx) {
+    void makePersistAbort(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext monCtx) {
 
         this.type = Type.ABORT;
         this.startTimestamp = startTimestamp;
-        this.isRetry = isRetry;
+        this.isCommitRetry = isCommitRetry;
         this.channel = c;
         this.monCtx = monCtx;
 
@@ -83,9 +83,9 @@ public final class PersistEvent {
 
     }
 
-    boolean isRetry() {
+    boolean isCommitRetry() {
 
-        return isRetry;
+        return isCommitRetry;
 
     }
 
@@ -111,7 +111,7 @@ public final class PersistEvent {
     public String toString() {
         return Objects.toStringHelper(this)
                 .add("type", type)
-                .add("isRetry", isRetry)
+                .add("isCommitRetry", isCommitRetry)
                 .add("ST", startTimestamp)
                 .add("CT", commitTimestamp)
                 .add("LWM", lowWatermark)

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
index f4e4a51..0b1a34f 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
@@ -26,7 +26,7 @@ interface PersistenceProcessor {
     void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
             throws Exception;
 
-    void addAbortToBatch(long startTimestamp, boolean isRetry, Channel c, MonitoringContext monCtx) throws Exception;
+    void addAbortToBatch(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext monCtx) throws Exception;
 
     void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
index cf7557a..079c8c4 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
@@ -39,12 +39,13 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
     private final LeaseManagement leaseManager;
 
     private final ReplyProcessor replyProcessor;
-    private final RetryProcessor retryProc;
+    private final RetryProcessor retryProcessor;
     private final CommitTable.Writer writer;
     final Panicker panicker;
 
     private final Timer flushTimer;
     private final Histogram batchSizeHistogram;
+    private final Histogram flushedCommitEventsHistogram;
 
     @Inject
     PersistenceProcessorHandler(MetricsRegistry metrics,
@@ -52,7 +53,7 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
                                 LeaseManagement leaseManager,
                                 CommitTable commitTable,
                                 ReplyProcessor replyProcessor,
-                                RetryProcessor retryProc,
+                                RetryProcessor retryProcessor,
                                 Panicker panicker)
     throws InterruptedException, ExecutionException, IOException {
 
@@ -60,54 +61,63 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
         this.leaseManager = leaseManager;
         this.writer = commitTable.getWriter();
         this.replyProcessor = replyProcessor;
-        this.retryProc = retryProc;
+        this.retryProcessor = retryProcessor;
         this.panicker = panicker;
 
-        flushTimer = metrics.timer(name("tso", "persist", "flush"));
-        batchSizeHistogram = metrics.histogram(name("tso", "persist", "batchsize"));
+        // Metrics in this component
+        flushTimer = metrics.timer(name("tso", "persist", "flush", "latency"));
+        flushedCommitEventsHistogram = metrics.histogram(name("tso", "persist", "flushed", "commits", "size"));
+        batchSizeHistogram = metrics.histogram(name("tso", "persist", "batch", "size"));
 
     }
 
     @Override
-    public void onEvent(PersistenceProcessorImpl.PersistBatchEvent event) throws Exception {
+    public void onEvent(PersistenceProcessorImpl.PersistBatchEvent batchEvent) throws Exception {
 
-        Batch batch = event.getBatch();
-        for (int i=0; i < batch.getNumEvents(); ++i) {
-            PersistEvent localEvent = batch.get(i);
+        int commitEventsToFlush = 0;
+        Batch batch = batchEvent.getBatch();
+        int numOfBatchedEvents = batch.getNumEvents();
+        batchSizeHistogram.update(numOfBatchedEvents);
+        for (int i=0; i < numOfBatchedEvents; ++i) {
+            PersistEvent event = batch.get(i);
 
-            switch (localEvent.getType()) {
+            switch (event.getType()) {
             case COMMIT:
-                localEvent.getMonCtx().timerStart("commitPersistProcessor");
+                event.getMonCtx().timerStart("commitPersistProcessor");
                 // TODO: What happens when the IOException is thrown?
-                writer.addCommittedTransaction(localEvent.getStartTimestamp(), localEvent.getCommitTimestamp());
+                writer.addCommittedTransaction(event.getStartTimestamp(), event.getCommitTimestamp());
+                commitEventsToFlush++;
                 break;
             case ABORT:
                 break;
             case TIMESTAMP:
-                localEvent.getMonCtx().timerStart("timestampPersistProcessor");
+                event.getMonCtx().timerStart("timestampPersistProcessor");
                 break;
-            default:
-                throw new RuntimeException("Unknown event type: " + localEvent.getType().name());
             }
         }
-        if (batch.getNumEvents() > 0) {
-            flush(batch.getNumEvents());
-            sendReplies(batch, event.getBatchSequence());
-        }
+
+        // Flush and send the responses back to the client. WARNING: Before sending the responses, first we need
+        // to filter commit retries in the batch to disambiguate them.
+        flush(commitEventsToFlush);
+        filterAndDissambiguateClientRetries(batch);
+        replyProcessor.manageResponsesBatch(batchEvent.getBatchSequence(), batch);
+
     }
 
-    private void flush(int numBatchedEvents) {
+    void flush(int commitEventsToFlush) {
 
-            commitSuicideIfNotMaster();
-            try {
-                long startFlushTimeInNs = System.nanoTime();
+        commitSuicideIfNotMaster();
+        try {
+            long startFlushTimeInNs = System.nanoTime();
+            if(commitEventsToFlush > 0) {
                 writer.flush();
-                flushTimer.update(System.nanoTime() - startFlushTimeInNs);
-                batchSizeHistogram.update(numBatchedEvents);
-            } catch (IOException e) {
-                panicker.panic("Error persisting commit batch", e);
             }
-            commitSuicideIfNotMaster(); // TODO Here, we can return the client responses before committing suicide
+            flushTimer.update(System.nanoTime() - startFlushTimeInNs);
+            flushedCommitEventsHistogram.update(commitEventsToFlush);
+        } catch (IOException e) {
+            panicker.panic("Error persisting commit batch", e);
+        }
+        commitSuicideIfNotMaster();
 
     }
 
@@ -117,30 +127,33 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
         }
     }
 
-    private void sendReplies(Batch batch, long batchSequence) {
-
-        int i = 0;
-        while (i < batch.getNumEvents()) {
-            PersistEvent e = batch.get(i);
-            if (e.getType() == PersistEvent.Type.ABORT && e.isRetry()) {
-                retryProc.disambiguateRetryRequestHeuristically(e.getStartTimestamp(), e.getChannel(), e.getMonCtx());
-                PersistEvent tmp = batch.get(i);
-                //TODO: why assign it?
-                batch.set(i, batch.get(batch.getNumEvents() - 1));
-                batch.set(batch.getNumEvents()  - 1, tmp);
-                if (batch.getNumEvents()  == 1) {
-                    batch.clear();
-                    replyProcessor.manageResponsesBatch(batchSequence, null);
-                    return;
-                }
+    void filterAndDissambiguateClientRetries(Batch batch) {
+
+        int currentEventIdx = 0;
+        while (currentEventIdx <= batch.getLastEventIdx()) {
+            PersistEvent event = batch.get(currentEventIdx);
+            if (event.isCommitRetry()) {
+                retryProcessor.disambiguateRetryRequestHeuristically(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
+                // Swap the disambiguated event with the last batch event & decrease the # of remaining elems to process
+                swapBatchElements(batch, currentEventIdx, batch.getLastEventIdx());
                 batch.decreaseNumEvents();
-                continue;
+                if (batch.isEmpty()) {
+                    break; // We're OK to call now the reply processor
+                } else {
+                    continue; // Otherwise we continue checking for retries from the new event in the current position
+                }
+            } else {
+                currentEventIdx++; // Let's check if the next event was a retry
             }
-            i++;
         }
 
-        replyProcessor.manageResponsesBatch(batchSequence, batch);
+    }
 
+    private void swapBatchElements(Batch batch, int firstIdx, int lastIdx) {
+        PersistEvent tmpEvent = batch.get(firstIdx);
+        PersistEvent lastEventInBatch = batch.get(lastIdx);
+        batch.set(firstIdx, lastEventInBatch);
+        batch.set(lastIdx, tmpEvent);
     }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index 761d30b..0da0c91 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -128,10 +128,10 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
     }
 
     @Override
-    public void addAbortToBatch(long startTimestamp, boolean isRetry, Channel c, MonitoringContext context)
+    public void addAbortToBatch(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext context)
             throws Exception {
 
-        currentBatch.addAbort(startTimestamp, isRetry, c, context);
+        currentBatch.addAbort(startTimestamp, isCommitRetry, c, context);
         if (currentBatch.isFull()) {
             triggerCurrentBatchFlush();
         }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index c1709ef..91df214 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -90,42 +90,37 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
 
     }
 
-    private void handleReplyBatchEvent(ReplyBatchEvent event) throws Exception {
+    private void handleReplyBatchEvent(ReplyBatchEvent replyBatchEvent) throws Exception {
 
         String name;
-        Batch batch = event.getBatch();
-        for (int i = 0; batch != null && i < batch.getNumEvents(); ++i) {
-            PersistEvent localEvent = batch.get(i);
+        Batch batch = replyBatchEvent.getBatch();
+        for (int i = 0; i < batch.getNumEvents(); i++) {
+            PersistEvent event = batch.get(i);
 
-            switch (localEvent.getType()) {
+            switch (event.getType()) {
             case COMMIT:
                 name = "commitReplyProcessor";
-                localEvent.getMonCtx().timerStart(name);
-                sendCommitResponse(localEvent.getStartTimestamp(), localEvent.getCommitTimestamp(), localEvent.getChannel());
-                localEvent.getMonCtx().timerStop(name);
-                 break;
+                event.getMonCtx().timerStart(name);
+                sendCommitResponse(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel());
+                event.getMonCtx().timerStop(name);
+                break;
             case ABORT:
                 name = "abortReplyProcessor";
-                localEvent.getMonCtx().timerStart(name);
-                sendAbortResponse(localEvent.getStartTimestamp(), localEvent.getChannel());
-                localEvent.getMonCtx().timerStop(name);
+                event.getMonCtx().timerStart(name);
+                sendAbortResponse(event.getStartTimestamp(), event.getChannel());
+                event.getMonCtx().timerStop(name);
                 break;
             case TIMESTAMP:
                 name = "timestampReplyProcessor";
-                localEvent.getMonCtx().timerStart(name);
-                sendTimestampResponse(localEvent.getStartTimestamp(), localEvent.getChannel());
-                localEvent.getMonCtx().timerStop(name);
-                break;
-            default:
-                LOG.error("Unknown event {}", localEvent.getType());
+                event.getMonCtx().timerStart(name);
+                sendTimestampResponse(event.getStartTimestamp(), event.getChannel());
+                event.getMonCtx().timerStop(name);
                 break;
             }
-            localEvent.getMonCtx().publish();
+            event.getMonCtx().publish();
         }
 
-        if (batch != null) {
-            batchPool.returnObject(batch);
-        }
+        batchPool.returnObject(batch);
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
index 0ad5c08..372733e 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
@@ -38,7 +38,6 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestEvent>, RequestProcessor, TimeoutHandler {
@@ -163,7 +162,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
 
         long startTimestamp = event.getStartTimestamp();
         Iterable<Long> writeSet = event.writeSet();
-        boolean isRetry = event.isRetry();
+        boolean isCommitRetry = event.isCommitRetry();
         Channel c = event.getChannel();
 
         boolean txCanCommit;
@@ -207,7 +206,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
             persistProc.addCommitToBatch(startTimestamp, commitTimestamp, c, event.getMonCtx());
 
         } else { // add it to the aborted list
-            persistProc.addAbortToBatch(startTimestamp, isRetry, c, event.getMonCtx());
+            persistProc.addAbortToBatch(startTimestamp, isCommitRetry, c, event.getMonCtx());
         }
 
     }
@@ -221,7 +220,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
         private Type type = null;
         private Channel channel = null;
 
-        private boolean isRetry = false;
+        private boolean isCommitRetry = false;
         private long startTimestamp = 0;
         private MonitoringContext monCtx;
         private long numCells = 0;
@@ -246,7 +245,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
             e.type = Type.COMMIT;
             e.channel = c;
             e.startTimestamp = startTimestamp;
-            e.isRetry = isRetry;
+            e.isCommitRetry = isRetry;
             if (writeSet.size() > MAX_INLINE) {
                 e.numCells = writeSet.size();
                 e.writeSetAsCollection = writeSet;
@@ -315,8 +314,8 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
 
         }
 
-        boolean isRetry() {
-            return isRetry;
+        boolean isCommitRetry() {
+            return isCommitRetry;
         }
 
         final static EventFactory<RequestEvent> EVENT_FACTORY = new EventFactory<RequestEvent>() {

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
new file mode 100644
index 0000000..4dcc2b0
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.metrics.NullMetricsProvider;
+import org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent;
+import org.jboss.netty.channel.Channel;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+public class TestPersistenceProcessorHandler {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestPersistenceProcessorHandler.class);
+
+    private static final int BATCH_ID = 0;
+    private static final int BATCH_SIZE = 6;
+    private static final long BATCH_SEQUENCE = 0;
+
+    private static final long FIRST_ST = 0L;
+    private static final long FIRST_CT = 1L;
+    private static final long SECOND_ST = 2L;
+    private static final long SECOND_CT = 3L;
+    private static final long THIRD_ST = 4L;
+    private static final long THIRD_CT = 5L;
+    private static final long FOURTH_ST = 6L;
+    private static final long FOURTH_CT = 7L;
+    private static final long FIFTH_ST = 8L;
+    private static final long FIFTH_CT = 9L;
+    private static final long SIXTH_ST = 10L;
+
+    @Mock
+    private CommitTable.Writer mockWriter;
+    @Mock
+    private CommitTable.Client mockClient;
+    @Mock
+    private LeaseManager leaseManager;
+    @Mock
+    private ReplyProcessor replyProcessor;
+    @Mock
+    private RetryProcessor retryProcessor;
+    @Mock
+    private Panicker panicker;
+
+    private CommitTable commitTable;
+
+    private MetricsRegistry metrics;
+
+    // Component under test
+    private PersistenceProcessorHandler persistenceHandler;
+
+    @BeforeMethod(alwaysRun = true, timeOut = 30_000)
+    public void initMocksAndComponents() throws Exception {
+
+        MockitoAnnotations.initMocks(this);
+
+        // Configure null metrics provider
+        metrics = new NullMetricsProvider();
+
+        // Configure commit table to return the mocked writer and client
+        commitTable = new CommitTable() {
+            @Override
+            public Writer getWriter() {
+                return mockWriter;
+            }
+
+            @Override
+            public Client getClient() {
+                return mockClient;
+            }
+        };
+
+        // Simulate we're master for most of the tests
+        doReturn(true).when(leaseManager).stillInLeasePeriod();
+
+        persistenceHandler = spy(new PersistenceProcessorHandler(metrics,
+                                                                 "localhost:1234",
+                                                                 leaseManager,
+                                                                 commitTable,
+                                                                 replyProcessor,
+                                                                 retryProcessor,
+                                                                 panicker));
+
+    }
+
+    @AfterMethod
+    void afterMethod() {
+        Mockito.reset(mockWriter);
+    }
+
+    @Test(timeOut = 10_000)
+    public void testProcessingOfEmptyBatchPersistEvent() throws Exception {
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+        persistenceHandler.onEvent(batchEvent);
+
+        verify(persistenceHandler, times(1)).flush(eq(0));
+        verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+        assertTrue(batch.isEmpty());
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testProcessingOfBatchPersistEventWithASingleTimestampEvent() throws Exception {
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+        batch.addTimestamp(FIRST_ST, null, mock(MonitoringContext.class));
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+        persistenceHandler.onEvent(batchEvent);
+
+        verify(persistenceHandler, times(1)).flush(eq(0));
+        verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+        assertEquals(batch.getNumEvents(), 1);
+        assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testProcessingOfBatchPersistEventWithASingleCommitEvent() throws Exception {
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+        persistenceHandler.onEvent(batchEvent);
+
+        verify(persistenceHandler, times(1)).flush(eq(1));
+        verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+        assertEquals(batch.getNumEvents(), 1);
+        assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
+        assertEquals(batch.get(0).getCommitTimestamp(), FIRST_CT);
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testProcessingOfBatchPersistEventWithASingleAbortEventNoRetry() throws Exception {
+
+        final boolean IS_RETRY = false;
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+        batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+        persistenceHandler.onEvent(batchEvent);
+
+        verify(persistenceHandler, times(1)).flush(eq(0));
+        verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+        assertEquals(batch.getNumEvents(), 1);
+        assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testProcessingOfBatchPersistEventWithASingleAbortEventWithRetry() throws Exception {
+
+        final boolean IS_RETRY = true;
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+        batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+        // Call process method
+        persistenceHandler.onEvent(batchEvent);
+
+        verify(persistenceHandler, times(1)).flush(eq(0));
+        verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+        assertEquals(batch.getNumEvents(), 0);
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testProcessingOfBatchPersistEventWith2EventsCommitAndAbortWithRetry() throws Exception {
+
+        final boolean IS_RETRY = true;
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+        batch.addAbort(SECOND_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+        // Initial assertion
+        assertEquals(batch.getNumEvents(), 2);
+
+        // Call process method
+        persistenceHandler.onEvent(batchEvent);
+
+        verify(persistenceHandler, times(1)).flush(eq(1));
+        verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+        assertEquals(batch.getNumEvents(), 1);
+        assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
+        assertEquals(batch.get(0).getCommitTimestamp(), FIRST_CT);
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testProcessingOfBatchPersistEventWith2EventsAbortWithRetryAndCommit() throws Exception {
+        // ------------------------------------------------------------------------------------------------------------
+        // Same test as testProcessingOfBatchPersistEventWith2EventsCommitAndAbortWithRetry but swapped events
+        // ------------------------------------------------------------------------------------------------------------
+
+        final boolean IS_RETRY = true;
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+        batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContext.class));
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+        // Initial assertion
+        assertEquals(batch.getNumEvents(), 2);
+
+        // Call process method
+        persistenceHandler.onEvent(batchEvent);
+
+        verify(persistenceHandler, times(1)).flush(eq(1));
+        verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+        assertEquals(batch.getNumEvents(), 1);
+        assertEquals(batch.get(0).getStartTimestamp(), SECOND_ST);
+        assertEquals(batch.get(0).getCommitTimestamp(), SECOND_CT);
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testProcessingOfBatchPersistEventWith2AbortWithRetryEvents() throws Exception {
+
+        final boolean IS_RETRY = true;
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+        batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        batch.addAbort(SECOND_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+        // Initial assertion
+        assertEquals(batch.getNumEvents(), 2);
+
+        // Call process method
+        persistenceHandler.onEvent(batchEvent);
+
+        verify(persistenceHandler, times(1)).flush(eq(0));
+        verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+        assertEquals(batch.getNumEvents(), 0);
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testProcessingOfBatchPersistEventWith2NonRetryAbortEvents() throws Exception {
+
+        final boolean IS_RETRY = false;
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+        batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        batch.addAbort(SECOND_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+        // Initial assertion
+        assertEquals(batch.getNumEvents(), 2);
+
+        // Call process method
+        persistenceHandler.onEvent(batchEvent);
+
+        verify(persistenceHandler, times(1)).flush(eq(0));
+        verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
+        verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+        assertEquals(batch.getNumEvents(), 2);
+        assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
+        assertEquals(batch.get(1).getStartTimestamp(), SECOND_ST);
+
+    }
+
+
+    @Test(timeOut = 10_000)
+    public void testProcessingOfBatchPersistEventWithMultipleRetryAndNonRetryEvents() throws Exception {
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+
+        batch.addTimestamp(FIRST_ST, null, mock(MonitoringContext.class));
+        batch.addAbort(SECOND_ST, true, null, mock(MonitoringContext.class));
+        batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContext.class));
+        batch.addAbort(FOURTH_ST, false, null, mock(MonitoringContext.class));
+        batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContext.class));
+        batch.addAbort(SIXTH_ST, true, null, mock(MonitoringContext.class));
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+        // Initial assertion
+        assertEquals(batch.getNumEvents(), 6);
+
+        // Call process method
+        persistenceHandler.onEvent(batchEvent);
+
+        verify(persistenceHandler, times(1)).flush(2); // 2 commits to flush
+        verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
+        verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+        assertEquals(batch.getNumEvents(), 4);
+        assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
+        assertEquals(batch.get(1).getStartTimestamp(), FIFTH_ST);
+        assertEquals(batch.get(1).getCommitTimestamp(), FIFTH_CT);
+        assertEquals(batch.get(2).getStartTimestamp(), THIRD_ST);
+        assertEquals(batch.get(2).getCommitTimestamp(), THIRD_CT);
+        assertEquals(batch.get(3).getStartTimestamp(), FOURTH_ST);
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testPanicPersistingEvents() throws Exception {
+
+        // User the real panicker
+        Panicker panicker = spy(new RuntimeExceptionPanicker());
+        persistenceHandler = spy(new PersistenceProcessorHandler(metrics,
+                                                                 "localhost:1234",
+                                                                 leaseManager,
+                                                                 commitTable,
+                                                                 replyProcessor,
+                                                                 retryProcessor,
+                                                                 panicker));
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+        doThrow(IOException.class).when(mockWriter).flush();
+
+        try {
+            persistenceHandler.onEvent(batchEvent);
+            fail();
+        } catch (RuntimeException re) {
+            // Expected
+        }
+
+        verify(persistenceHandler, times(1)).flush(1);
+        verify(panicker, times(1)).panic(eq("Error persisting commit batch"), any(IOException.class));
+        verify(persistenceHandler, never()).filterAndDissambiguateClientRetries(any(Batch.class));
+        verify(replyProcessor, never()).manageResponsesBatch(anyLong(), any(Batch.class));
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testPanicBecauseMasterLosesMastership() throws Exception {
+
+        // ------------------------------------------------------------------------------------------------------------
+        // 1) Test panic before flushing
+        // ------------------------------------------------------------------------------------------------------------
+
+        // Simulate we lose mastership BEFORE flushing
+        doReturn(false).when(leaseManager).stillInLeasePeriod();
+
+        // User the real panicker
+        Panicker panicker = spy(new RuntimeExceptionPanicker());
+        persistenceHandler = spy(new PersistenceProcessorHandler(metrics,
+                                                                 "localhost:1234",
+                                                                 leaseManager,
+                                                                 commitTable,
+                                                                 replyProcessor,
+                                                                 retryProcessor,
+                                                                 panicker));
+
+        // Prepare test batch
+        Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+        PersistBatchEvent batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+        try {
+            persistenceHandler.onEvent(batchEvent);
+            fail();
+        } catch (RuntimeException re) {
+            // Expected
+        }
+        verify(persistenceHandler, times(1)).flush(eq(1));
+        verify(mockWriter, never()).flush();
+        verify(panicker, times(1)).panic(eq("Replica localhost:1234 lost mastership whilst flushing data. Committing suicide"), any(IOException.class));
+        verify(persistenceHandler, never()).filterAndDissambiguateClientRetries(any(Batch.class));
+        verify(replyProcessor, never()).manageResponsesBatch(anyLong(), any(Batch.class));
+
+        // ------------------------------------------------------------------------------------------------------------
+        // 2) Test panic after flushing
+        // ------------------------------------------------------------------------------------------------------------
+
+        // Simulate we lose mastership AFTER flushing
+        doReturn(true).doReturn(false).when(leaseManager).stillInLeasePeriod();
+
+        // User the real panicker
+        panicker = spy(new RuntimeExceptionPanicker());
+        persistenceHandler = spy(new PersistenceProcessorHandler(metrics,
+                                                                 "localhost:1234",
+                                                                 leaseManager,
+                                                                 commitTable,
+                                                                 replyProcessor,
+                                                                 retryProcessor,
+                                                                 panicker));
+
+        // Prepare test batch
+        batch = new Batch(BATCH_ID, BATCH_SIZE);
+        batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+        batchEvent = new PersistBatchEvent();
+        PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+        try {
+            persistenceHandler.onEvent(batchEvent);
+            fail();
+        } catch (RuntimeException re) {
+            // Expected
+        }
+        verify(persistenceHandler, times(1)).flush(eq(1));
+        verify(mockWriter, times(1)).flush();
+        verify(panicker, times(1)).panic(eq("Replica localhost:1234 lost mastership whilst flushing data. Committing suicide"), any(IOException.class));
+        verify(persistenceHandler, never()).filterAndDissambiguateClientRetries(any(Batch.class));
+        verify(replyProcessor, never()).manageResponsesBatch(anyLong(), any(Batch.class));
+
+    }
+
+}



[6/7] incubator-omid git commit: Separate commit retry events from aborts in batch contents

Posted by fp...@apache.org.
Separate commit retry events from aborts in batch contents

Before, the Abort events were re-used for holding what in reality are
commit retries, what made the code criptic in many places. Now commit
retries are treated as first class events.

Change-Id: Ib9c9f424c3b7b94a02d5e985b388ef74f13c87da


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/8954a342
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/8954a342
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/8954a342

Branch: refs/heads/master
Commit: 8954a3421803b70aa25de2d8ee574ca4f3ed05e4
Parents: 3659b96
Author: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Authored: Mon May 16 15:07:59 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Wed May 18 12:26:31 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/omid/tso/Batch.java    | 13 +++++-
 .../java/org/apache/omid/tso/PersistEvent.java  | 30 +++++--------
 .../apache/omid/tso/PersistenceProcessor.java   |  5 ++-
 .../omid/tso/PersistenceProcessorHandler.java   |  3 +-
 .../omid/tso/PersistenceProcessorImpl.java      | 12 ++++-
 .../org/apache/omid/tso/ReplyProcessorImpl.java | 12 +++--
 .../apache/omid/tso/RequestProcessorImpl.java   | 10 ++++-
 .../java/org/apache/omid/tso/TestBatch.java     | 16 ++++---
 .../tso/TestPersistenceProcessorHandler.java    | 46 ++++++++------------
 .../apache/omid/tso/TestRequestProcessor.java   |  6 +--
 10 files changed, 84 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/Batch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/Batch.java b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
index 06ddc71..496bca2 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/Batch.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
@@ -101,11 +101,20 @@ public class Batch {
 
     }
 
-    void addAbort(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext context) {
+    public void addCommitRetry(long startTimestamp, Channel c, MonitoringContext context) {
+
+        Preconditions.checkState(!isFull(), "batch is full");
+        int index = numEvents++;
+        PersistEvent e = events[index];
+        e.makeCommitRetry(startTimestamp, c, context);
+
+    }
+
+    void addAbort(long startTimestamp, Channel c, MonitoringContext context) {
         Preconditions.checkState(!isFull(), "batch is full");
         int index = numEvents++;
         PersistEvent e = events[index];
-        e.makePersistAbort(startTimestamp, isCommitRetry, c, context);
+        e.makePersistAbort(startTimestamp, c, context);
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
index 9bf9e89..db58677 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
@@ -25,16 +25,14 @@ public final class PersistEvent {
     private MonitoringContext monCtx;
 
     enum Type {
-        TIMESTAMP, COMMIT, ABORT
+        TIMESTAMP, COMMIT, ABORT, COMMIT_RETRY
     }
 
     private Type type = null;
     private Channel channel = null;
 
-    private boolean isCommitRetry = false;
     private long startTimestamp = 0L;
     private long commitTimestamp = 0L;
-    private long lowWatermark = 0L;
 
     void makePersistCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
 
@@ -46,11 +44,19 @@ public final class PersistEvent {
 
     }
 
-    void makePersistAbort(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext monCtx) {
+    void makeCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) {
+
+        this.type = Type.COMMIT_RETRY;
+        this.startTimestamp = startTimestamp;
+        this.channel = c;
+        this.monCtx = monCtx;
+
+    }
+
+    void makePersistAbort(long startTimestamp, Channel c, MonitoringContext monCtx) {
 
         this.type = Type.ABORT;
         this.startTimestamp = startTimestamp;
-        this.isCommitRetry = isCommitRetry;
         this.channel = c;
         this.monCtx = monCtx;
 
@@ -83,12 +89,6 @@ public final class PersistEvent {
 
     }
 
-    boolean isCommitRetry() {
-
-        return isCommitRetry;
-
-    }
-
     long getStartTimestamp() {
 
         return startTimestamp;
@@ -101,20 +101,12 @@ public final class PersistEvent {
 
     }
 
-    long getLowWatermark() {
-
-        return lowWatermark;
-
-    }
-
     @Override
     public String toString() {
         return Objects.toStringHelper(this)
                 .add("type", type)
-                .add("isCommitRetry", isCommitRetry)
                 .add("ST", startTimestamp)
                 .add("CT", commitTimestamp)
-                .add("LWM", lowWatermark)
                 .toString();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
index 0b1a34f..07893f6 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
@@ -26,12 +26,13 @@ interface PersistenceProcessor {
     void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
             throws Exception;
 
-    void addAbortToBatch(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext monCtx) throws Exception;
+    void addCommitRetryToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
+
+    void addAbortToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
 
     void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
 
     void triggerCurrentBatchFlush() throws Exception;
 
     Future<Void> persistLowWatermark(long lowWatermark);
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
index 079c8c4..bfe0036 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 
 import static com.codahale.metrics.MetricRegistry.name;
+import static org.apache.omid.tso.PersistEvent.Type.*;
 
 public class PersistenceProcessorHandler implements WorkHandler<PersistenceProcessorImpl.PersistBatchEvent> {
 
@@ -132,7 +133,7 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
         int currentEventIdx = 0;
         while (currentEventIdx <= batch.getLastEventIdx()) {
             PersistEvent event = batch.get(currentEventIdx);
-            if (event.isCommitRetry()) {
+            if (event.getType() == COMMIT_RETRY) {
                 retryProcessor.disambiguateRetryRequestHeuristically(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
                 // Swap the disambiguated event with the last batch event & decrease the # of remaining elems to process
                 swapBatchElements(batch, currentEventIdx, batch.getLastEventIdx());

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index 0da0c91..4fe21d2 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -128,10 +128,18 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
     }
 
     @Override
-    public void addAbortToBatch(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext context)
+    public void addCommitRetryToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+        currentBatch.addCommitRetry(startTimestamp, c, monCtx);
+        if (currentBatch.isFull()) {
+            triggerCurrentBatchFlush();
+        }
+    }
+
+    @Override
+    public void addAbortToBatch(long startTimestamp, Channel c, MonitoringContext context)
             throws Exception {
 
-        currentBatch.addAbort(startTimestamp, isCommitRetry, c, context);
+        currentBatch.addAbort(startTimestamp, c, context);
         if (currentBatch.isFull()) {
             triggerCurrentBatchFlush();
         }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index 91df214..a46c240 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.omid.tso;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Inject;
 import com.lmax.disruptor.BatchEventProcessor;
@@ -50,9 +51,11 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
 
     private final RingBuffer<ReplyBatchEvent> replyRing;
 
-    private AtomicLong nextIDToHandle = new AtomicLong();
+    @VisibleForTesting
+    AtomicLong nextIDToHandle = new AtomicLong();
 
-    private PriorityQueue<ReplyBatchEvent> futureEvents;
+    @VisibleForTesting
+    PriorityQueue<ReplyBatchEvent> futureEvents;
 
     // Metrics
     private final Meter abortMeter;
@@ -90,7 +93,8 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
 
     }
 
-    private void handleReplyBatchEvent(ReplyBatchEvent replyBatchEvent) throws Exception {
+    @VisibleForTesting
+    void handleReplyBatchEvent(ReplyBatchEvent replyBatchEvent) throws Exception {
 
         String name;
         Batch batch = replyBatchEvent.getBatch();
@@ -116,6 +120,8 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
                 sendTimestampResponse(event.getStartTimestamp(), event.getChannel());
                 event.getMonCtx().timerStop(name);
                 break;
+            case COMMIT_RETRY:
+                throw new RuntimeException("COMMIT_RETRY events must be filtered before this step. Event: {}");
             }
             event.getMonCtx().publish();
         }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
index 372733e..32f5b8c 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
@@ -205,8 +205,14 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
             }
             persistProc.addCommitToBatch(startTimestamp, commitTimestamp, c, event.getMonCtx());
 
-        } else { // add it to the aborted list
-            persistProc.addAbortToBatch(startTimestamp, isCommitRetry, c, event.getMonCtx());
+        } else {
+
+            if (isCommitRetry) { // Re-check if it was already committed but the client retried due to a lag replying
+                persistProc.addCommitRetryToBatch(startTimestamp, c, event.getMonCtx());
+            } else {
+                persistProc.addAbortToBatch(startTimestamp, c, event.getMonCtx());
+            }
+
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
index c472606..448eeaa 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
@@ -70,12 +70,14 @@ public class TestBatch {
 
         // Test when filling the batch with different types of events, that becomes full
         for (int i = 0; i < BATCH_SIZE; i++) {
-            if (i % 3 == 0) {
+            if (i % 4 == 0) {
                 batch.addTimestamp(ANY_ST, channel, monCtx);
-            } else if (i % 3 == 1) {
+            } else if (i % 4 == 1) {
                 batch.addCommit(ANY_ST, ANY_CT, channel, monCtx);
+            } else if (i % 4 == 2) {
+                batch.addCommitRetry(ANY_ST, channel, monCtx);
             } else {
-                batch.addAbort(ANY_ST, false, channel, monCtx);
+                batch.addAbort(ANY_ST, channel, monCtx);
             }
         }
         assertFalse(batch.isEmpty(), "Batch should contain elements");
@@ -95,7 +97,8 @@ public class TestBatch {
         // Check the first 3 events and the last one correspond to the filling done above
         assertTrue(batch.get(0).getType().equals(PersistEvent.Type.TIMESTAMP));
         assertTrue(batch.get(1).getType().equals(PersistEvent.Type.COMMIT));
-        assertTrue(batch.get(2).getType().equals(PersistEvent.Type.ABORT));
+        assertTrue(batch.get(2).getType().equals(PersistEvent.Type.COMMIT_RETRY));
+        assertTrue(batch.get(3).getType().equals(PersistEvent.Type.ABORT));
 
         // Set a new value for last element in Batch and check we obtain the right result
         batch.decreaseNumEvents();
@@ -137,10 +140,11 @@ public class TestBatch {
         // Put some elements in the batch...
         batch.addTimestamp(ANY_ST, channel, monCtx);
         batch.addCommit(ANY_ST, ANY_CT, channel, monCtx);
-        batch.addAbort(ANY_ST, false, channel, monCtx);
+        batch.addCommitRetry(ANY_ST, channel, monCtx);
+        batch.addAbort(ANY_ST, channel, monCtx);
         assertFalse(batch.isEmpty(), "Batch should contain elements");
         assertFalse(batch.isFull(), "Batch should NOT be full");
-        assertEquals(batch.getNumEvents(), 3, "Num events should be 3");
+        assertEquals(batch.getNumEvents(), 4, "Num events should be 4");
 
         // ... and passivate the object through the factory. It should reset the state of the batch
         factory.passivateObject(pooledBatch);

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
index 4dcc2b0..d60d019 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
@@ -185,11 +185,9 @@ public class TestPersistenceProcessorHandler {
     @Test(timeOut = 10_000)
     public void testProcessingOfBatchPersistEventWithASingleAbortEventNoRetry() throws Exception {
 
-        final boolean IS_RETRY = false;
-
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        batch.addAbort(FIRST_ST, null, mock(MonitoringContext.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
         persistenceHandler.onEvent(batchEvent);
@@ -204,13 +202,11 @@ public class TestPersistenceProcessorHandler {
     }
 
     @Test(timeOut = 10_000)
-    public void testProcessingOfBatchPersistEventWithASingleAbortEventWithRetry() throws Exception {
-
-        final boolean IS_RETRY = true;
+    public void testProcessingOfBatchPersistEventWithASingleCommitRetryEvent() throws Exception {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -226,14 +222,12 @@ public class TestPersistenceProcessorHandler {
     }
 
     @Test(timeOut = 10_000)
-    public void testProcessingOfBatchPersistEventWith2EventsCommitAndAbortWithRetry() throws Exception {
-
-        final boolean IS_RETRY = true;
+    public void testProcessingOfBatchPersistEventWith2EventsCommitAndCommitRetry() throws Exception {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
         batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
-        batch.addAbort(SECOND_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -254,16 +248,14 @@ public class TestPersistenceProcessorHandler {
     }
 
     @Test(timeOut = 10_000)
-    public void testProcessingOfBatchPersistEventWith2EventsAbortWithRetryAndCommit() throws Exception {
+    public void testProcessingOfBatchPersistEventWith2EventsCommitRetryAndCommit() throws Exception {
         // ------------------------------------------------------------------------------------------------------------
-        // Same test as testProcessingOfBatchPersistEventWith2EventsCommitAndAbortWithRetry but swapped events
+        // Same test as testProcessingOfBatchPersistEventWith2EventsCommitAndCommitRetry but swapped events
         // ------------------------------------------------------------------------------------------------------------
 
-        final boolean IS_RETRY = true;
-
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
         batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContext.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -285,14 +277,12 @@ public class TestPersistenceProcessorHandler {
     }
 
     @Test(timeOut = 10_000)
-    public void testProcessingOfBatchPersistEventWith2AbortWithRetryEvents() throws Exception {
-
-        final boolean IS_RETRY = true;
+    public void testProcessingOfBatchPersistEventWith2CommitRetryEvents() throws Exception {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
-        batch.addAbort(SECOND_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -312,14 +302,12 @@ public class TestPersistenceProcessorHandler {
     }
 
     @Test(timeOut = 10_000)
-    public void testProcessingOfBatchPersistEventWith2NonRetryAbortEvents() throws Exception {
-
-        final boolean IS_RETRY = false;
+    public void testProcessingOfBatchPersistEventWith2AbortEvents() throws Exception {
 
         // Prepare test batch
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
-        batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
-        batch.addAbort(SECOND_ST, IS_RETRY, null, mock(MonitoringContext.class));
+        batch.addAbort(FIRST_ST, null, mock(MonitoringContext.class));
+        batch.addAbort(SECOND_ST, null, mock(MonitoringContext.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 
@@ -347,11 +335,11 @@ public class TestPersistenceProcessorHandler {
         Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
 
         batch.addTimestamp(FIRST_ST, null, mock(MonitoringContext.class));
-        batch.addAbort(SECOND_ST, true, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
         batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContext.class));
-        batch.addAbort(FOURTH_ST, false, null, mock(MonitoringContext.class));
+        batch.addAbort(FOURTH_ST, null, mock(MonitoringContext.class));
         batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContext.class));
-        batch.addAbort(SIXTH_ST, true, null, mock(MonitoringContext.class));
+        batch.addCommitRetry(SIXTH_ST, null, mock(MonitoringContext.class));
         PersistBatchEvent batchEvent = new PersistBatchEvent();
         PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
index 5b17d1e..8f2b06e 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
@@ -112,7 +112,7 @@ public class TestRequestProcessor {
 
         List<Long> writeSet = Lists.newArrayList(1L, 20L, 203L);
         requestProc.commitRequest(firstTS - 1, writeSet, false, null, new MonitoringContext(metrics));
-        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - 1), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - 1), any(Channel.class), any(MonitoringContext.class));
 
         requestProc.commitRequest(firstTS, writeSet, false, null, new MonitoringContext(metrics));
         ArgumentCaptor<Long> commitTScapture = ArgumentCaptor.forClass(Long.class);
@@ -136,7 +136,7 @@ public class TestRequestProcessor {
         requestProc.commitRequest(thirdTS, writeSet, false, null, new MonitoringContext(metrics));
         verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContext.class));
         requestProc.commitRequest(secondTS, writeSet, false, null, new MonitoringContext(metrics));
-        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), any(Channel.class), any(MonitoringContext.class));
 
     }
 
@@ -159,7 +159,7 @@ public class TestRequestProcessor {
 
         // ...check that the transaction is aborted when trying to commit
         requestProc.commitRequest(startTS, writeSet, false, null, new MonitoringContext(metrics));
-        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+        verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), any(Channel.class), any(MonitoringContext.class));
 
     }
 


[2/7] incubator-omid git commit: [ci skip] Add NOTICE

Posted by fp...@apache.org.
[ci skip] Add NOTICE

Change-Id: I386f5fa20d2578dc87a9b1eb93441c9cea4eb06b


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/fb63cf39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/fb63cf39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/fb63cf39

Branch: refs/heads/master
Commit: fb63cf3940c216f6eb175f45e067be5f64c9097f
Parents: fcc56bb
Author: Igor Katkov <ka...@yahoo-inc.com>
Authored: Wed May 18 11:45:04 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Wed May 18 12:26:31 2016 -0700

----------------------------------------------------------------------
 NOTICE | 23 +++++++++++++++++++++++
 1 file changed, 23 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/fb63cf39/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
new file mode 100644
index 0000000..7e71dbc
--- /dev/null
+++ b/NOTICE
@@ -0,0 +1,23 @@
+Apache Omid
+Copyright 2016 The Apache Software Foundation.
+
+This product includes software developed at The Apache Software Foundation (http://www.apache.org/).
+
+This distribution has a binary dependency on:
+
+Google Guava v11.0.2 (https://github.com/google/guava) [Apache 2.0]
+Guice v3.0 (https://github.com/google/guice/wiki) [Apache 2.0]
+Testng v6.8.8 (http://testng.org) [Apache 2.0]
+SLF4J (http://www.slf4j.org/) v1.7.7 [MIT License]
+Netty (http://netty.io) v3.2.6.Final [Apache 2.0]
+Google Protocol Buffers v2.5.0 (https://developers.google.com/protocol-buffers/) [BSD License]
+Mockito (http://mockito.org/) v1.9.5 [MIT License]
+LMAX Disruptor v3.2.0 (https://lmax-exchange.github.io/disruptor/) [Apache 2.0]
+Coda Hale/Yammer.com Dropwizard Metrics v3.0.1 (http://metrics.dropwizard.io/3.1.0/) [Apache 2.0]
+JCommander v1.35 (http://jcommander.org/) [Apache 2.0]
+Hamcrest v1.3 (http://hamcrest.org/JavaHamcrest/) [BSD License]
+Snakeyml v1.11 (https://bitbucket.org/asomov/snakeyaml) [Apache 2.0]
+Google FindBugs v3.0.1 (http://findbugs.sourceforge.net/) [Lesser GNU General Public License]
+
+
+