You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by fp...@apache.org on 2016/05/18 19:34:36 UTC
[1/7] incubator-omid git commit: [ci skip]prepare for next
development iteration [Forced Update!]
Repository: incubator-omid
Updated Branches:
refs/heads/master 104697504 -> b77cce396 (forced update)
[ci skip]prepare for next development iteration
Change-Id: Id9f05c50fea98b33be83a934b89bab81c7ad1762
Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/a8bee71b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/a8bee71b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/a8bee71b
Branch: refs/heads/master
Commit: a8bee71b112c1eac836618945c11a73422eef096
Parents: 8ff9fff
Author: Igor Katkov <ka...@yahoo-inc.com>
Authored: Thu May 12 12:52:09 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Wed May 18 12:26:23 2016 -0700
----------------------------------------------------------------------
benchmarks/pom.xml | 2 +-
codahale-metrics/pom.xml | 2 +-
commit-table/pom.xml | 2 +-
common/pom.xml | 2 +-
examples/pom.xml | 2 +-
hbase-client/pom.xml | 2 +-
hbase-commit-table/pom.xml | 2 +-
hbase-common/pom.xml | 2 +-
hbase-coprocessor/pom.xml | 2 +-
hbase-shims/hbase-0/pom.xml | 2 +-
hbase-shims/hbase-1/pom.xml | 2 +-
hbase-shims/pom.xml | 2 +-
hbase-tools/pom.xml | 2 +-
metrics/pom.xml | 2 +-
pom.xml | 2 +-
statemachine/pom.xml | 2 +-
timestamp-storage/pom.xml | 2 +-
transaction-client/pom.xml | 2 +-
tso-server/pom.xml | 2 +-
19 files changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/benchmarks/pom.xml
----------------------------------------------------------------------
diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml
index 62cce83..e24d342 100644
--- a/benchmarks/pom.xml
+++ b/benchmarks/pom.xml
@@ -5,7 +5,7 @@
<parent>
<groupId>org.apache.omid</groupId>
<artifactId>omid</artifactId>
- <version>0.8.1.53-SNAPSHOT</version>
+ <version>0.8.2.1-SNAPSHOT</version>
</parent>
<artifactId>benchmarks</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/codahale-metrics/pom.xml
----------------------------------------------------------------------
diff --git a/codahale-metrics/pom.xml b/codahale-metrics/pom.xml
index ba5d560..bafcdc8 100644
--- a/codahale-metrics/pom.xml
+++ b/codahale-metrics/pom.xml
@@ -4,7 +4,7 @@
<parent>
<artifactId>omid</artifactId>
<groupId>org.apache.omid</groupId>
- <version>0.8.1.53-SNAPSHOT</version>
+ <version>0.8.2.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/commit-table/pom.xml
----------------------------------------------------------------------
diff --git a/commit-table/pom.xml b/commit-table/pom.xml
index 5cc7be0..d904359 100644
--- a/commit-table/pom.xml
+++ b/commit-table/pom.xml
@@ -5,7 +5,7 @@
<parent>
<groupId>org.apache.omid</groupId>
<artifactId>omid</artifactId>
- <version>0.8.1.53-SNAPSHOT</version>
+ <version>0.8.2.1-SNAPSHOT</version>
</parent>
<artifactId>commit-table</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/common/pom.xml
----------------------------------------------------------------------
diff --git a/common/pom.xml b/common/pom.xml
index 5e84383..b0cbc79 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -5,7 +5,7 @@
<parent>
<groupId>org.apache.omid</groupId>
<artifactId>omid</artifactId>
- <version>0.8.1.53-SNAPSHOT</version>
+ <version>0.8.2.1-SNAPSHOT</version>
</parent>
<artifactId>common</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 4a704cf..fdfb0fa 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -6,7 +6,7 @@
<parent>
<artifactId>omid</artifactId>
<groupId>org.apache.omid</groupId>
- <version>0.8.1.53-SNAPSHOT</version>
+ <version>0.8.2.1-SNAPSHOT</version>
</parent>
<artifactId>examples</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/hbase-client/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml
index 01818ba..4e2e55d 100644
--- a/hbase-client/pom.xml
+++ b/hbase-client/pom.xml
@@ -5,7 +5,7 @@
<parent>
<groupId>org.apache.omid</groupId>
<artifactId>omid</artifactId>
- <version>0.8.1.53-SNAPSHOT</version>
+ <version>0.8.2.1-SNAPSHOT</version>
</parent>
<artifactId>hbase-client</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/hbase-commit-table/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-commit-table/pom.xml b/hbase-commit-table/pom.xml
index c0b5104..4bd7a89 100644
--- a/hbase-commit-table/pom.xml
+++ b/hbase-commit-table/pom.xml
@@ -5,7 +5,7 @@
<parent>
<groupId>org.apache.omid</groupId>
<artifactId>omid</artifactId>
- <version>0.8.1.53-SNAPSHOT</version>
+ <version>0.8.2.1-SNAPSHOT</version>
</parent>
<artifactId>hbase-commit-table</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/hbase-common/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-common/pom.xml b/hbase-common/pom.xml
index 76a1272..3373702 100644
--- a/hbase-common/pom.xml
+++ b/hbase-common/pom.xml
@@ -5,7 +5,7 @@
<parent>
<groupId>org.apache.omid</groupId>
<artifactId>omid</artifactId>
- <version>0.8.1.53-SNAPSHOT</version>
+ <version>0.8.2.1-SNAPSHOT</version>
</parent>
<artifactId>hbase-common</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/hbase-coprocessor/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/pom.xml b/hbase-coprocessor/pom.xml
index 7dd7f16..d14d8dc 100644
--- a/hbase-coprocessor/pom.xml
+++ b/hbase-coprocessor/pom.xml
@@ -5,7 +5,7 @@
<parent>
<groupId>org.apache.omid</groupId>
<artifactId>omid</artifactId>
- <version>0.8.1.53-SNAPSHOT</version>
+ <version>0.8.2.1-SNAPSHOT</version>
</parent>
<artifactId>hbase-coprocessor</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/hbase-shims/hbase-0/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-shims/hbase-0/pom.xml b/hbase-shims/hbase-0/pom.xml
index 84659de..4593485 100644
--- a/hbase-shims/hbase-0/pom.xml
+++ b/hbase-shims/hbase-0/pom.xml
@@ -5,7 +5,7 @@
<parent>
<groupId>org.apache.omid</groupId>
<artifactId>omid-shims-aggregator</artifactId>
- <version>0.8.1.53-SNAPSHOT</version>
+ <version>0.8.2.1-SNAPSHOT</version>
</parent>
<artifactId>hbase0-shims</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/hbase-shims/hbase-1/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-shims/hbase-1/pom.xml b/hbase-shims/hbase-1/pom.xml
index 924c5c7..e16494e 100644
--- a/hbase-shims/hbase-1/pom.xml
+++ b/hbase-shims/hbase-1/pom.xml
@@ -5,7 +5,7 @@
<parent>
<groupId>org.apache.omid</groupId>
<artifactId>omid-shims-aggregator</artifactId>
- <version>0.8.1.53-SNAPSHOT</version>
+ <version>0.8.2.1-SNAPSHOT</version>
</parent>
<artifactId>hbase1-shims</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/hbase-shims/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-shims/pom.xml b/hbase-shims/pom.xml
index 77fca87..3d2aa99 100644
--- a/hbase-shims/pom.xml
+++ b/hbase-shims/pom.xml
@@ -5,7 +5,7 @@
<parent>
<groupId>org.apache.omid</groupId>
<artifactId>omid</artifactId>
- <version>0.8.1.53-SNAPSHOT</version>
+ <version>0.8.2.1-SNAPSHOT</version>
</parent>
<artifactId>omid-shims-aggregator</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/hbase-tools/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-tools/pom.xml b/hbase-tools/pom.xml
index 762b021..7c57978 100644
--- a/hbase-tools/pom.xml
+++ b/hbase-tools/pom.xml
@@ -5,7 +5,7 @@
<parent>
<groupId>org.apache.omid</groupId>
<artifactId>omid</artifactId>
- <version>0.8.1.53-SNAPSHOT</version>
+ <version>0.8.2.1-SNAPSHOT</version>
</parent>
<artifactId>hbase-tools</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/metrics/pom.xml
----------------------------------------------------------------------
diff --git a/metrics/pom.xml b/metrics/pom.xml
index a22d924..577b699 100644
--- a/metrics/pom.xml
+++ b/metrics/pom.xml
@@ -6,7 +6,7 @@
<parent>
<artifactId>omid</artifactId>
<groupId>org.apache.omid</groupId>
- <version>0.8.1.53-SNAPSHOT</version>
+ <version>0.8.2.1-SNAPSHOT</version>
</parent>
<artifactId>metrics</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 652df9c..d6137e9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -13,7 +13,7 @@
<artifactId>omid</artifactId>
<packaging>pom</packaging>
<!-- WARNING: do not update version manually, use mvn versions:set -->
- <version>0.8.1.53-SNAPSHOT</version>
+ <version>0.8.2.1-SNAPSHOT</version>
<organization>
<name>Apache Software Foundation</name>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/statemachine/pom.xml
----------------------------------------------------------------------
diff --git a/statemachine/pom.xml b/statemachine/pom.xml
index e5c47d6..3cf875b 100644
--- a/statemachine/pom.xml
+++ b/statemachine/pom.xml
@@ -5,7 +5,7 @@
<parent>
<groupId>org.apache.omid</groupId>
<artifactId>omid</artifactId>
- <version>0.8.1.53-SNAPSHOT</version>
+ <version>0.8.2.1-SNAPSHOT</version>
</parent>
<name>State Machine</name>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/timestamp-storage/pom.xml
----------------------------------------------------------------------
diff --git a/timestamp-storage/pom.xml b/timestamp-storage/pom.xml
index 03b07b5..f0d777a 100644
--- a/timestamp-storage/pom.xml
+++ b/timestamp-storage/pom.xml
@@ -5,7 +5,7 @@
<parent>
<groupId>org.apache.omid</groupId>
<artifactId>omid</artifactId>
- <version>0.8.1.53-SNAPSHOT</version>
+ <version>0.8.2.1-SNAPSHOT</version>
</parent>
<artifactId>timestamp-storage</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/transaction-client/pom.xml
----------------------------------------------------------------------
diff --git a/transaction-client/pom.xml b/transaction-client/pom.xml
index fe038c4..2be9e67 100644
--- a/transaction-client/pom.xml
+++ b/transaction-client/pom.xml
@@ -5,7 +5,7 @@
<parent>
<groupId>org.apache.omid</groupId>
<artifactId>omid</artifactId>
- <version>0.8.1.53-SNAPSHOT</version>
+ <version>0.8.2.1-SNAPSHOT</version>
</parent>
<artifactId>transaction-client</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/a8bee71b/tso-server/pom.xml
----------------------------------------------------------------------
diff --git a/tso-server/pom.xml b/tso-server/pom.xml
index a7dfdf5..86800e5 100644
--- a/tso-server/pom.xml
+++ b/tso-server/pom.xml
@@ -5,7 +5,7 @@
<parent>
<groupId>org.apache.omid</groupId>
<artifactId>omid</artifactId>
- <version>0.8.1.53-SNAPSHOT</version>
+ <version>0.8.2.1-SNAPSHOT</version>
</parent>
<artifactId>tso-server</artifactId>
[7/7] incubator-omid git commit: [ci skip] Add 4096 keys in KEYS
Posted by fp...@apache.org.
[ci skip] Add 4096 keys in KEYS
Change-Id: I4d97845d423f9b15f7b1ce223e70f3c756dd34c3
Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/b77cce39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/b77cce39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/b77cce39
Branch: refs/heads/master
Commit: b77cce3960151086c53b1e58f4c01c42cc75cd68
Parents: fb63cf3
Author: Igor Katkov <ka...@yahoo-inc.com>
Authored: Wed May 18 12:01:31 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Wed May 18 12:32:48 2016 -0700
----------------------------------------------------------------------
KEYS | 113 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 113 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/b77cce39/KEYS
----------------------------------------------------------------------
diff --git a/KEYS b/KEYS
new file mode 100644
index 0000000..9142a46
--- /dev/null
+++ b/KEYS
@@ -0,0 +1,113 @@
+This file contains the GPG keys of Apache Omid developers.
+
+Users: gpg --import KEYS
+
+Developers:
+ Create a key:
+ gpg --gen-key
+
+ Adding you key to this file:
+ (gpg --fingerprint <key id> && gpg --armor --export <key id>) >> this file.
+
+ Publish the key:
+ gpg --keyserver pgp.mit.edu --send-keys <key id>
+
+ Signing another developers key:
+ gpg --keyserver pgp.mit.edu --search-keys <name or email>
+ gpg --keyserver pgp.mit.edu --recv-keys <key id>
+ gpg --sign-key <key id>
+ gpg --keyserver pgp.mit.edu --send-keys <key id>
+
+ Additional Information:
+ http://www.apache.org/dev/openpgp.html#generate-key
+
+********************************* PLEASE NOTE **********************************
+
+ Releases will be signed using one of these keys in this file. This file will
+ be available with the distributed Apache Omid releases at:
+
+ https://dist.apache.org/repos/dist/release/incubator/omid/KEYS
+
+********************************************************************************
+
+pub 4096R/7DDE0136 2016-05-13
+ Key fingerprint = 719B 6514 E694 433F 704E 221C 6407 EEB3 7DDE 0136
+ uid [ultimate] Igor Katkov <ik...@apache.org>
+ sub 4096R/ED70F1F3 2016-05-13
+
+-----BEGIN PGP PUBLIC KEY BLOCK-----
+Version: GnuPG v2
+
+mQENBFc0/q4BCADC/WsQwFK9hFAsE2a2jR1hffUMAPOz+yu2r0c7UyRyCB6Ti+2i
+m7eHNrml0piad7JVxkw6yyPMxL3P7jH9emWCFTmhGenAtLrbRn8nv9CKKxHTCdkY
+lp2HRFcXqSybr9OiuxELKMxtatpzfNkhBLNpVtYjyNJSPU51n7aW/SeJwaaPC+22
+7TajGB5SbUrSSEXlanpIzXD1AFzrUCZ/EQBSAfQsIWtum4tiCZt7KonOgCkMrshR
+qmmAof1SXYSIGN8hOEkVW/GaFLqELKndLG02QbcNtf/ASlw6K4cWXBvJ1ddokZk8
+SdxCj22quyerrxJiJ5M3Fo6uwkBoBzh+bbZXABEBAAG0IElnb3IgS2F0a292IDxp
+a2F0a292QGFwYWNoZS5vcmc+iQE4BBMBAgAiBQJXNP6uAhsDBgsJCAcDAgYVCAIJ
+CgsEFgIDAQIeAQIXgAAKCRDg1uSyxqugq1NzB/0dlItBpTydQB7RhMBqzbpP1QXr
+MHtgdQDJBckau6IP0ITTXompLsxJCbH4hrfecpUyF1KGWq5lU+KAQNkL9uydlzrx
+eejbyz0uOpuquWSPfj93zoFgSFrfNnCPQNTRvbLKg/YpO3z+CkDcWlPW/Zob7k8Y
+jAHpzVP0bc21Vvu3gmIBUrttHlUtogjU/gP2EtoWs1YO3vqZEUAQG0PbiWGvqBop
+ki7xGyadwbmQJfeaxCkhHunChBsJSl/ev/wowwMSXEXyzBjzb7iJDZmtjWzdat5x
+d8hHGgEWDf4mPKhw2aIaLEkIUCaBiE+sYvPLe/cIMmoQPZ7V0T8Cq12EXdjhuQEN
+BFc0/q4BCADGx5W0e53RmbqJh2K7w6JqNtFEC6s1wD9IixnxRbUQHqL4lduUBdcW
+R4umSp1o3SK7rA2QnKHarNNY+x9AKtUxC/wGH59yv6omlE9HnAudVReGh5hvIip2
+KDzvFbBPfgJR6jI+RVdCf9NJjm1HQVFV20BK5XilkwYYeK7C7ioH27FvWSWWeVDj
+Y022yT0QAEa9NRUWIAE/YZleazbZ/0O++NRm2fiBD4592/WvzcCqvtsNRDzUfg5c
+wY0p7oiqTYl0lm9bsGLxV7hD0crIVTm+qPIIDDhgzSfJWke1sFZGAD9s+WwJ/EuU
+hPPO2iFxmn3QcpmLOEKAbPXTOVZ3FmyLABEBAAGJAR8EGAECAAkFAlc0/q4CGwwA
+CgkQ4NbkssaroKstGgf/fHFUm6diXe4MOLLBNpxGWrG+6rc3v42v/QzObkQAZSyu
+1ml4RfunzWQjNN3IoIQ6yvSrLM6gFavTT+uhu/2y6HMiSqNyBBMixVKUoOpd/HHh
+lHtz72kyYeDlaNKre4IUvA7Y+9sdkM1t2YyE6AuGWYF/55R699112AqH4B5WHSzj
+rgglzECeZmG88Y9V020axhmpzOC3HdiQOujZHCq8Gvc58nbxaf8wsvZZ59E3YjUD
+M4eOr1YZ6BQnrxy0jNnKPwXL+lI68S38Fy9IaGEeXH4Rv03qmeaeAucvyTMzB9Kw
++Cd1v5MmlWKAPPC1g8TmnGGoR+9ptWWL35D48WlwsJkCDQRXNlXvARAA20nu3iVj
+yZPlyrMmimEvtPPnSSM/dURdQG2Gs6E6XvU7F8r2BkzH6PImIYxAWlGeyesaPymh
+NfMxG7Sz5FiLKtlFpK/iVKqEdXxP6U57LVleR3hisT0rO/C4CZ53AdwF9YhYz625
+zDEq4AkX94q74RC2pz7AA2LgLunYuwdns31SoRL/fnfJnqOqEdiyS5n395pzSKaD
+fbcLaq+81wQQtmkQNv3B/JmJsjkx5WKAfweImuRERDrbOpmWW8hHLDZ8etberG8T
+aIAkL4sIQTmZ+EGwkmLUBgWHmxDwMlJW7ECm+qDxvvt42onQUHsMVBbAORSRGdtN
+omO0j/InKeHh6EZbsGiuUqM5/oJJTQd+qyp0icOAjYzbjF9TA22ChfqVjn8TrFbf
+97G7XFIsxdrv9OdmtqzsBk4Ds3H2/mh0d7HUm2VDQ4nsDxevenY7MSLDhNCjE8wX
+mgpmxsm4GXVY2yLIaV6wi1wzP6XghIqS/jXpL4ymTF8s1PoEG+a8mG2QjDVu0W2E
+TbqMGAjvnQTdwVkRjkFPueeGssn0F1yeevuP6eaxGD4youFzwivQc7jsqXonUF0A
+Uop8qQzlp9JLk3qPvHHtDOmtsrR0TxteGVzrJ1m54LuYEqYRbIM3KxOVjphIdR79
+pA7s9pfXyhINty8dlWoxJaC46i24w3BapT0AEQEAAbQgSWdvciBLYXRrb3YgPGlr
+YXRrb3ZAYXBhY2hlLm9yZz6JAjkEEwEIACMFAlc2Ve8CGwMHCwkIBwMCAQYVCAIJ
+CgsEFgIDAQIeAQIXgAAKCRBkB+6zfd4BNmJUD/9K5Ktqm55epkW47ithg9Sk1o6/
+AAqMp56aU7xJRzeHRqsg3n7Z+1nP2LYFZo+so3NJ9RH40xPBD4UF8W2fu+Ks04Zg
+y5IhJuSYxyFzVGulCsiSFVQsAmb4qqBHMFvXmi7gt7W6f7tYvwbqY/RT7uWRmg5K
+HlH3WtBYwoBJ541eNE9WCScQ16R/+CQMoYzCYRf1woKyGe5wqu2SEShh8T/Qdn3n
+jjJc1Y1JTSYI/KobLVkvVop0Z9gG+yzj/4boiik/SHm8D/ZM4Xlx5x3Ju5foqHDT
+n6p1NRdDY8ryE656opxKa7fsTD8t1SUHgpX/zH10YLVwhUcsTJRyHZhcKkgrzohz
+FGAAGPsEhHXO15A7vAGxKIkCgWuN04JITsYAOzTiXnzMhMnDn9zi0tz1dw2Bf+i9
+sbBbLxrHTCarj90tRtcgn8hiFTpBgEMySMjaPUWt1RZeesAN60yKr3qWwDa65ZED
+mIIGJ0l1Dfz1LJuLyCjDEQKC38yhgrVttYIfzbn/9V95vz+xOWauCeSszF497zP4
+Gx8Q7w9SzmYwT1cv4ZsHEEHl1ryF59mOwfS/ZzYjs4qksQaSB53Ah1BvJ3Qxmpwe
+bA8UegF16uFAdJFsG8158nA9YPNrHXFKc2ltEd5EHacesE1FG2J1ZpSg4Bcgjb7N
+1zNJnvzv8vfKnQAuX7kCDQRXNlXvARAAzc7hecGPv1Zgph/8/PSCX0geWat5i19Q
+exgW9+lZ4TSO0Nr/TO7kShXRsMX6YQR5lEPPGZQqvAyyZTNcB5IjnCyamiarhgGd
+PRqvWqkUCsvwPTFe/OKUx2BWuze0ivE4V5x2lhZansEusT+E3KfdV5dU5eDN/2gl
+s+TZ7yX0X1lxjWM1UCipBo5QKGaViodBYGJKYdZcNDtfJq64hsG+N1/OBoA6+jwW
+41wmv1N3TZYPzrr08Wv3vm3rTlXQMMBZGKluKxzboOtPbmLwu4LwYw5L0UrN18Qj
+kUk8eECYHk38eGrZDhRrvdG4bNGOVMYMUajhrD41SsB0Vk5mZK7ETYWbEnRJc41c
+y4p2Y69+lMdbqKTJ7mPw1+co3YqLfQJePqryBskrHmhJWELBtxybidh8VaUz5i0w
+qD+OPA0mnDX8zkuwl3UV6x7vTQql35iGCaDGnyNN5bIVFfxQF7x9mtZXQtf1nsQx
+I8A1lNvIXIe1lR8MPH2RavpQrEl0GzC1DK23zRviPBHyf7IezjF+B/8IzrlXD19u
+7sJ6YlZyaHlHUqZXJPUVKbM/AqSfbKpXmYeWXuIFaMOrF2ZWWt9V/r/duGZk1C69
+5X8uqHvfjuVZxE1bh2Xilo2h0adQFBZ3jBAPHDPh0GAppIMOhY7z91rUHNfnkobv
+mVUg7eU7apsAEQEAAYkCHwQYAQgACQUCVzZV7wIbDAAKCRBkB+6zfd4BNgu8D/sH
++1umPcnjGOXtD2W6FX6t3C3TzSOJ0GZv5ycv/9OKTBFw2HCwtLYWcbwY60Jgo8h5
+syLYfYGj5tQwtSLKx/6MtNPTK+JFK7++/OikQ7pFCeZvphP22JBs7sWfuR6TK/kJ
+NSNwNUoICBvKY03hnma4XVUHgadXJ9pfHZ49TaC+/ZkKYh2dLbNmVBHSI5rcjpD7
+YY6m+DjVXbDjr6mA8okcF/IpNuEs4l1BEy0YkbxR37UrPt9CBYOQwq+9j7MymoaQ
+75ZGZPhL+e44vwUVCPCvsdwcMhEtmAg5VbZ95q/eE201bKnIV/7ipkhlRs7iyTqS
+KnVeCcnowTFUQi3SpEuxK6gcm8mN28Nq2RpOdy5KV8GlVnN1wu5quWYw0YsjO+bc
+9d7Pyfb7inCzwWysG1z9H9vFLBbAnGhanwyeLPoyoVDjN+1egqesa6e8AQ8nIN1g
+oW4oidKktQkX8ca5LriWgS8/6H0S1C08SIhom11R3eA1sfJ6OabSomAKBVXlNRsq
+6ZPs31DelnPFlnCwF3d3PeL720sPGsaCdeChTFqWYbbferHCKoEdmYcrqVAxsG2q
+Io2Cs9jOV2iU4yYf3dJgUiBR/Vq4Dt0AqNzPTNojBAuAUOP+d63+ojF9Qp/VqUKj
+GfBNQDqZTcwRAan7+TJD21r9uaJAIXYr/SQX8z0bbw==
+=oZEC
+-----END PGP PUBLIC KEY BLOCK-----
[3/7] incubator-omid git commit: Remove IOException from
TimestampOracle intf
Posted by fp...@apache.org.
Remove IOException from TimestampOracle intf
The unnecessary exception introduced a several unnecessary boilerplate code.
Also the TimestampOracle interface has been commented properly.
Change-Id: I510e7677f07299061aebfd23f7a8d189e177a4f1
Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/484d116a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/484d116a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/484d116a
Branch: refs/heads/master
Commit: 484d116a50324f423b91514a6b4436cc90a13f05
Parents: a8bee71
Author: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Authored: Thu May 12 10:37:25 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Wed May 18 12:26:31 2016 -0700
----------------------------------------------------------------------
.../apache/omid/tso/RequestProcessorImpl.java | 40 ++++++++------------
.../org/apache/omid/tso/TimestampOracle.java | 16 ++++++--
.../apache/omid/tso/TimestampOracleImpl.java | 7 ++--
.../omid/tso/PausableTimestampOracle.java | 2 +-
.../java/org/apache/omid/tso/TestPanicker.java | 8 +---
.../apache/omid/tso/TestTimestampOracle.java | 8 +---
6 files changed, 36 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/484d116a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
index d101c72..0ad5c08 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
@@ -154,15 +154,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
private void handleTimestamp(RequestEvent requestEvent) throws Exception {
- long timestamp;
-
- try {
- timestamp = timestampOracle.next();
- } catch (IOException e) {
- LOG.error("Error getting timestamp", e);
- return;
- }
-
+ long timestamp = timestampOracle.next();
persistProc.addTimestampToBatch(timestamp, requestEvent.getChannel(), requestEvent.getMonCtx());
}
@@ -195,27 +187,25 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
if (txCanCommit) {
// 2. commit
- try {
- long commitTimestamp = timestampOracle.next();
- if (numCellsInWriteset > 0) {
- long newLowWatermark = lowWatermark;
+ long commitTimestamp = timestampOracle.next();
- for (long r : writeSet) {
- long removed = hashmap.putLatestWriteForCell(r, commitTimestamp);
- newLowWatermark = Math.max(removed, newLowWatermark);
- }
+ if (numCellsInWriteset > 0) {
+ long newLowWatermark = lowWatermark;
- if (newLowWatermark != lowWatermark) {
- LOG.trace("Setting new low Watermark to {}", newLowWatermark);
- lowWatermark = newLowWatermark;
- persistProc.persistLowWatermark(newLowWatermark); // Async persist
- }
+ for (long r : writeSet) {
+ long removed = hashmap.putLatestWriteForCell(r, commitTimestamp);
+ newLowWatermark = Math.max(removed, newLowWatermark);
+ }
+
+ if (newLowWatermark != lowWatermark) {
+ LOG.trace("Setting new low Watermark to {}", newLowWatermark);
+ lowWatermark = newLowWatermark;
+ persistProc.persistLowWatermark(newLowWatermark); // Async persist
}
- persistProc.addCommitToBatch(startTimestamp, commitTimestamp, c, event.getMonCtx());
- } catch (IOException e) {
- LOG.error("Error committing tx {}", startTimestamp, e);
}
+ persistProc.addCommitToBatch(startTimestamp, commitTimestamp, c, event.getMonCtx());
+
} else { // add it to the aborted list
persistProc.addAbortToBatch(startTimestamp, isRetry, c, event.getMonCtx());
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/484d116a/tso-server/src/main/java/org/apache/omid/tso/TimestampOracle.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TimestampOracle.java b/tso-server/src/main/java/org/apache/omid/tso/TimestampOracle.java
index f6c6127..3b795ef 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TimestampOracle.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TimestampOracle.java
@@ -19,16 +19,26 @@ package org.apache.omid.tso;
import java.io.IOException;
+/**
+ * Functionality of a service delivering monotonic increasing timestamps.
+ */
public interface TimestampOracle {
+ /**
+ * Allows the initialization of the Timestamp Oracle service.
+ * @throws IOException
+ * raised if a problem during initialization is shown.
+ */
void initialize() throws IOException;
/**
- * Returns the next timestamp if available. Otherwise spins till the
- * ts-persist thread performs the new timestamp allocation
+ * Returns the next timestamp.
*/
- long next() throws IOException;
+ long next();
+ /**
+ * Returns the last timestamp assigned.
+ */
long getLast();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/484d116a/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java b/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
index b5bbe4b..0a65c01 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
@@ -34,7 +34,7 @@ import java.util.concurrent.Executors;
import static org.apache.omid.metrics.MetricsUtils.name;
/**
- * The Timestamp Oracle that gives monotonically increasing timestamps
+ * The Timestamp Oracle that gives monotonically increasing timestamps.
*/
@Singleton
public class TimestampOracleImpl implements TimestampOracle {
@@ -129,12 +129,11 @@ public class TimestampOracleImpl implements TimestampOracle {
}
/**
- * Returns the next timestamp if available. Otherwise spins till the
- * ts-persist thread performs the new timestamp allocation
+ * Returns the next timestamp if available. Otherwise spins till the ts-persist thread allocates a new timestamp.
*/
@SuppressWarnings("StatementWithEmptyBody")
@Override
- public long next() throws IOException {
+ public long next() {
lastTimestamp++;
if (lastTimestamp == nextAllocationThreshold) {
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/484d116a/tso-server/src/test/java/org/apache/omid/tso/PausableTimestampOracle.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/PausableTimestampOracle.java b/tso-server/src/test/java/org/apache/omid/tso/PausableTimestampOracle.java
index 360bbe2..588acfd 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/PausableTimestampOracle.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/PausableTimestampOracle.java
@@ -39,7 +39,7 @@ public class PausableTimestampOracle extends TimestampOracleImpl {
}
@Override
- public long next() throws IOException {
+ public long next() {
while (tsoPaused) {
synchronized (this) {
try {
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/484d116a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
index 2964388..9b8ad29 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
@@ -77,12 +77,8 @@ public class TestPanicker {
Thread allocThread = new Thread("AllocThread") {
@Override
public void run() {
- try {
- while (true) {
- tso.next();
- }
- } catch (IOException ioe) {
- LOG.error("Shouldn't occur");
+ while (true) {
+ tso.next();
}
}
};
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/484d116a/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java b/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
index 501581b..c75e95b 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
@@ -97,12 +97,8 @@ public class TestTimestampOracle {
Thread allocThread = new Thread("AllocThread") {
@Override
public void run() {
- try {
- while (true) {
- timestampOracle.next();
- }
- } catch (IOException ioe) {
- LOG.error("Shouldn't occur");
+ while (true) {
+ timestampOracle.next();
}
}
};
[4/7] incubator-omid git commit: Add test battery for batch
re-ordering functionality in reply processor
Posted by fp...@apache.org.
Add test battery for batch re-ordering functionality in reply processor
Change-Id: I7105b1698ee4d1095baabdf844928518da57a4a7
Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/fcc56bbc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/fcc56bbc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/fcc56bbc
Branch: refs/heads/master
Commit: fcc56bbc3ac376b8180351e5d7e1890de627acc9
Parents: 8954a34
Author: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Authored: Mon May 16 15:10:22 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Wed May 18 12:26:31 2016 -0700
----------------------------------------------------------------------
.../org/apache/omid/tso/TestReplyProcessor.java | 262 +++++++++++++++++++
1 file changed, 262 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/fcc56bbc/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
new file mode 100644
index 0000000..d7bebd6
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import org.apache.commons.pool2.ObjectPool;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.metrics.NullMetricsProvider;
+import org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent;
+import org.apache.omid.tso.ReplyProcessorImpl.ReplyBatchEvent;
+import org.jboss.netty.channel.Channel;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertEqualsNoOrder;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+public class TestReplyProcessor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestReplyProcessor.class);
+
+ private static final long ANY_DISRUPTOR_SEQUENCE = 1234L;
+
+ public static final int BATCH_POOL_SIZE = 3;
+
+ private static final long FIRST_ST = 0L;
+ private static final long FIRST_CT = 1L;
+ private static final long SECOND_ST = 2L;
+ private static final long SECOND_CT = 3L;
+ private static final long THIRD_ST = 4L;
+ private static final long THIRD_CT = 5L;
+ private static final long FOURTH_ST = 6L;
+ private static final long FOURTH_CT = 7L;
+ private static final long FIFTH_ST = 8L;
+ private static final long FIFTH_CT = 9L;
+ private static final long SIXTH_ST = 10L;
+
+ @Mock
+ private Panicker panicker;
+
+ private MetricsRegistry metrics;
+
+ private ObjectPool<Batch> batchPool;
+
+ // Component under test
+ private ReplyProcessorImpl replyProcessor;
+
+ @BeforeMethod(alwaysRun = true, timeOut = 30_000)
+ public void initMocksAndComponents() throws Exception {
+
+ MockitoAnnotations.initMocks(this);
+
+ TSOServerConfig tsoConfig = new TSOServerConfig();
+ tsoConfig.setNumConcurrentCTWriters(BATCH_POOL_SIZE);
+
+ // Configure null metrics provider
+ metrics = new NullMetricsProvider();
+
+ batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
+
+ replyProcessor = spy(new ReplyProcessorImpl(metrics, panicker, batchPool));
+
+ }
+
+ @AfterMethod
+ void afterMethod() {
+ }
+
+ @Test(timeOut = 10_000)
+ public void testBadFormedPackageThrowsException() throws Exception {
+
+ // We need an instance throwing exceptions for this test
+ replyProcessor = spy(new ReplyProcessorImpl(metrics, new RuntimeExceptionPanicker(), batchPool));
+
+ // Prepare test batch
+ Batch batch = batchPool.borrowObject();
+ batch.addCommitRetry(FIRST_ST, null, null );
+ ReplyBatchEvent e = ReplyBatchEvent.EVENT_FACTORY.newInstance();
+ ReplyBatchEvent.makeReplyBatch(e, batch, 0);
+
+ assertEquals(replyProcessor.nextIDToHandle.get(), 0);
+ assertEquals(replyProcessor.futureEvents.size(), 0);
+ assertEquals(batchPool.getNumActive(), 1);
+ assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
+
+ try {
+ replyProcessor.onEvent(e, ANY_DISRUPTOR_SEQUENCE, false);
+ fail();
+ } catch(RuntimeException re) {
+ // Expected
+ }
+
+ assertEquals(replyProcessor.nextIDToHandle.get(), 0);
+ assertEquals(replyProcessor.futureEvents.size(), 0);
+ assertEquals(batchPool.getNumActive(), 1);
+ assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
+
+ }
+
+ @Test(timeOut = 10_000)
+ public void testUnorderedBatchSequenceGetsSaved() throws Exception {
+
+ final long HIGH_SEQUENCE_NUMBER = 1234L; // Should be greater than 0
+
+ // Prepare test batch
+ Batch batch = batchPool.borrowObject();
+ ReplyBatchEvent e = ReplyBatchEvent.EVENT_FACTORY.newInstance();
+ ReplyBatchEvent.makeReplyBatch(e, batch, HIGH_SEQUENCE_NUMBER);
+
+ assertEquals(replyProcessor.nextIDToHandle.get(), 0);
+ assertEquals(replyProcessor.futureEvents.size(), 0);
+ assertEquals(batchPool.getNumActive(), 1);
+ assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
+
+ replyProcessor.onEvent(e, ANY_DISRUPTOR_SEQUENCE, false);
+
+ assertEquals(replyProcessor.nextIDToHandle.get(), 0);
+ assertEquals(replyProcessor.futureEvents.size(), 1);
+ assertEquals(batchPool.getNumActive(), 1);
+ assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
+ assertTrue(batch.isEmpty());
+ verify(replyProcessor, times(0)).handleReplyBatchEvent(any(ReplyBatchEvent.class));
+
+ }
+
+ @Test(timeOut = 10_000)
+ public void testProcessingOfEmptyBatchReplyEvent() throws Exception {
+
+ // Prepare test batch
+ Batch batch = batchPool.borrowObject();
+ ReplyBatchEvent e = ReplyBatchEvent.EVENT_FACTORY.newInstance();
+ ReplyBatchEvent.makeReplyBatch(e, batch, 0);
+
+ assertEquals(replyProcessor.nextIDToHandle.get(), 0);
+ assertEquals(replyProcessor.futureEvents.size(), 0);
+ assertEquals(batchPool.getNumActive(), 1);
+ assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
+
+ replyProcessor.onEvent(e, ANY_DISRUPTOR_SEQUENCE, false);
+
+ assertEquals(replyProcessor.nextIDToHandle.get(), 1);
+ assertEquals(replyProcessor.futureEvents.size(), 0);
+ assertEquals(batchPool.getNumActive(), 0);
+ assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE);
+ assertTrue(batch.isEmpty());
+ verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(e));
+
+ }
+
+ @Test(timeOut = 10_000)
+ public void testUnorderedSequenceOfBatchReplyEventsThatMustBeOrderedBeforeSendingReplies() throws Exception {
+
+ // Prepare 3 batches with events and simulate a different order of arrival using the batch sequence
+
+ // Prepare first a delayed batch (Batch #3)
+ Batch thirdBatch = batchPool.borrowObject();
+ thirdBatch.addTimestamp(FIRST_ST, mock(Channel.class), mock(MonitoringContext.class));
+ thirdBatch.addCommit(SECOND_ST, SECOND_CT, mock(Channel.class), mock(MonitoringContext.class));
+ ReplyBatchEvent thirdBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
+ ReplyBatchEvent.makeReplyBatch(thirdBatchEvent, thirdBatch, 2); // Set a higher sequence than the initial one
+
+ assertEquals(replyProcessor.nextIDToHandle.get(), 0);
+ assertEquals(replyProcessor.futureEvents.size(), 0);
+ assertEquals(batchPool.getNumActive(), 1);
+ assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
+
+ replyProcessor.onEvent(thirdBatchEvent, ANY_DISRUPTOR_SEQUENCE, false);
+
+ assertEquals(replyProcessor.nextIDToHandle.get(), 0);
+ assertEquals(replyProcessor.futureEvents.size(), 1);
+ assertEquals(batchPool.getNumActive(), 1);
+ assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 1);
+ assertFalse(thirdBatch.isEmpty());
+ verify(replyProcessor, never()).handleReplyBatchEvent(eq(thirdBatchEvent));
+
+ // Prepare another delayed batch (Batch #2)
+ Batch secondBatch = batchPool.borrowObject();
+ secondBatch.addTimestamp(THIRD_ST, mock(Channel.class), mock(MonitoringContext.class));
+ secondBatch.addCommit(FOURTH_ST, FOURTH_CT, mock(Channel.class), mock(MonitoringContext.class));
+ ReplyBatchEvent secondBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
+ ReplyBatchEvent.makeReplyBatch(secondBatchEvent, secondBatch, 1); // Set another higher sequence
+
+ replyProcessor.onEvent(secondBatchEvent, ANY_DISRUPTOR_SEQUENCE, false);
+
+ assertEquals(replyProcessor.nextIDToHandle.get(), 0);
+ assertEquals(replyProcessor.futureEvents.size(), 2);
+ assertEquals(batchPool.getNumActive(), 2);
+ assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE - 2);
+ assertFalse(secondBatch.isEmpty());
+ assertFalse(thirdBatch.isEmpty());
+
+ // Finally, prepare the batch that should trigger the execution of the other two
+ Batch firstBatch = batchPool.borrowObject();
+ firstBatch.addAbort(FIFTH_ST, mock(Channel.class), mock(MonitoringContext.class));
+ ReplyBatchEvent firstBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
+ ReplyBatchEvent.makeReplyBatch(firstBatchEvent, firstBatch, 0); // Set the first batch with a higher sequence
+
+ replyProcessor.onEvent(firstBatchEvent, ANY_DISRUPTOR_SEQUENCE, false);
+
+ assertEquals(replyProcessor.nextIDToHandle.get(), 3);
+ assertEquals(replyProcessor.futureEvents.size(), 0);
+ assertEquals(batchPool.getNumActive(), 0);
+ assertEquals(batchPool.getNumIdle(), BATCH_POOL_SIZE);
+ assertTrue(firstBatch.isEmpty());
+ assertTrue(secondBatch.isEmpty());
+ assertTrue(thirdBatch.isEmpty());
+
+ // Check the method calls have been properly ordered
+
+ InOrder inOrderReplyBatchEvents = inOrder(replyProcessor, replyProcessor, replyProcessor);
+ inOrderReplyBatchEvents.verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(firstBatchEvent));
+ inOrderReplyBatchEvents.verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(secondBatchEvent));
+ inOrderReplyBatchEvents.verify(replyProcessor, times(1)).handleReplyBatchEvent(eq(thirdBatchEvent));
+
+ InOrder inOrderReplies = inOrder(replyProcessor, replyProcessor, replyProcessor, replyProcessor, replyProcessor);
+ inOrderReplies.verify(replyProcessor, times(1)).sendAbortResponse(eq(FIFTH_ST), any(Channel.class));
+ inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(THIRD_ST), any(Channel.class));
+ inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(FOURTH_ST), eq(FOURTH_CT), any(Channel.class));
+ inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(FIRST_ST), any(Channel.class));
+ inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(SECOND_ST), eq(SECOND_CT), any(Channel.class));
+
+ }
+
+}
[5/7] incubator-omid git commit: Fix bug in retry request management
that didn't return batches to pool
Posted by fp...@apache.org.
Fix bug in retry request management that didn't return batches to pool
Add new complete tests for PersistentProcessorHandler component.
Also improve readability.
Change-Id: Ied72d420bb229c3466b7b89cf14b0756c93b2e31
Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/3659b96c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/3659b96c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/3659b96c
Branch: refs/heads/master
Commit: 3659b96c5a775bd0f9bfbf9b65a67e5e7f0c6da2
Parents: 484d116
Author: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Authored: Fri May 13 14:08:09 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Wed May 18 12:26:31 2016 -0700
----------------------------------------------------------------------
.../main/java/org/apache/omid/tso/Batch.java | 49 +-
.../java/org/apache/omid/tso/PersistEvent.java | 12 +-
.../apache/omid/tso/PersistenceProcessor.java | 2 +-
.../omid/tso/PersistenceProcessorHandler.java | 107 ++--
.../omid/tso/PersistenceProcessorImpl.java | 4 +-
.../org/apache/omid/tso/ReplyProcessorImpl.java | 39 +-
.../apache/omid/tso/RequestProcessorImpl.java | 13 +-
.../tso/TestPersistenceProcessorHandler.java | 488 +++++++++++++++++++
8 files changed, 607 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/main/java/org/apache/omid/tso/Batch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/Batch.java b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
index c5ed696..06ddc71 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/Batch.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
@@ -35,11 +35,11 @@ public class Batch {
private final int id;
private final int size;
private int numEvents;
- private final PersistEvent[] events;
+ private final PersistEvent[] events; // TODO Check if it's worth to have a dynamic structure for this
Batch(int id, int size) {
- Preconditions.checkArgument(size > 0, "Size must be positive");
+ Preconditions.checkArgument(size > 0, "Size [%s] must be positive", size);
this.size = size;
this.id = id;
this.numEvents = 0;
@@ -51,41 +51,46 @@ public class Batch {
}
- boolean isFull() {
-
- Preconditions.checkState(numEvents <= size, "numEvents > size");
- return numEvents == size;
+ PersistEvent get(int idx) {
+ Preconditions.checkState(numEvents > 0 && 0 <= idx && idx < numEvents,
+ "Accessing Events array (Size = %s) with wrong index [%s]", numEvents, idx);
+ return events[idx];
+ }
+ void set(int idx, PersistEvent event) {
+ Preconditions.checkState(0 <= idx && idx < numEvents);
+ events[idx] = event;
}
- boolean isEmpty() {
+ void clear() {
- return numEvents == 0;
+ numEvents = 0;
}
+ void decreaseNumEvents() {
+ numEvents--;
+ }
+
int getNumEvents() {
return numEvents;
}
- void clear() {
+ int getLastEventIdx() {
+ return numEvents - 1;
+ }
- numEvents = 0;
+ boolean isFull() {
- }
+ Preconditions.checkState(numEvents <= size, "Batch Full: numEvents [%s] > size [%s]", numEvents, size);
+ return numEvents == size;
- PersistEvent get(int idx) {
- Preconditions.checkState(numEvents > 0 && 0 <= idx && idx < numEvents);
- return events[idx];
}
- void set(int idx, PersistEvent event) {
- Preconditions.checkState(0 <= idx && idx < numEvents);
- events[idx] = event;
- }
+ boolean isEmpty() {
+
+ return numEvents == 0;
- void decreaseNumEvents() {
- numEvents--;
}
void addCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext context) {
@@ -96,11 +101,11 @@ public class Batch {
}
- void addAbort(long startTimestamp, boolean isRetry, Channel c, MonitoringContext context) {
+ void addAbort(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext context) {
Preconditions.checkState(!isFull(), "batch is full");
int index = numEvents++;
PersistEvent e = events[index];
- e.makePersistAbort(startTimestamp, isRetry, c, context);
+ e.makePersistAbort(startTimestamp, isCommitRetry, c, context);
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
index 8d4fd85..9bf9e89 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
@@ -31,7 +31,7 @@ public final class PersistEvent {
private Type type = null;
private Channel channel = null;
- private boolean isRetry = false;
+ private boolean isCommitRetry = false;
private long startTimestamp = 0L;
private long commitTimestamp = 0L;
private long lowWatermark = 0L;
@@ -46,11 +46,11 @@ public final class PersistEvent {
}
- void makePersistAbort(long startTimestamp, boolean isRetry, Channel c, MonitoringContext monCtx) {
+ void makePersistAbort(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext monCtx) {
this.type = Type.ABORT;
this.startTimestamp = startTimestamp;
- this.isRetry = isRetry;
+ this.isCommitRetry = isCommitRetry;
this.channel = c;
this.monCtx = monCtx;
@@ -83,9 +83,9 @@ public final class PersistEvent {
}
- boolean isRetry() {
+ boolean isCommitRetry() {
- return isRetry;
+ return isCommitRetry;
}
@@ -111,7 +111,7 @@ public final class PersistEvent {
public String toString() {
return Objects.toStringHelper(this)
.add("type", type)
- .add("isRetry", isRetry)
+ .add("isCommitRetry", isCommitRetry)
.add("ST", startTimestamp)
.add("CT", commitTimestamp)
.add("LWM", lowWatermark)
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
index f4e4a51..0b1a34f 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
@@ -26,7 +26,7 @@ interface PersistenceProcessor {
void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
throws Exception;
- void addAbortToBatch(long startTimestamp, boolean isRetry, Channel c, MonitoringContext monCtx) throws Exception;
+ void addAbortToBatch(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext monCtx) throws Exception;
void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
index cf7557a..079c8c4 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
@@ -39,12 +39,13 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
private final LeaseManagement leaseManager;
private final ReplyProcessor replyProcessor;
- private final RetryProcessor retryProc;
+ private final RetryProcessor retryProcessor;
private final CommitTable.Writer writer;
final Panicker panicker;
private final Timer flushTimer;
private final Histogram batchSizeHistogram;
+ private final Histogram flushedCommitEventsHistogram;
@Inject
PersistenceProcessorHandler(MetricsRegistry metrics,
@@ -52,7 +53,7 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
LeaseManagement leaseManager,
CommitTable commitTable,
ReplyProcessor replyProcessor,
- RetryProcessor retryProc,
+ RetryProcessor retryProcessor,
Panicker panicker)
throws InterruptedException, ExecutionException, IOException {
@@ -60,54 +61,63 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
this.leaseManager = leaseManager;
this.writer = commitTable.getWriter();
this.replyProcessor = replyProcessor;
- this.retryProc = retryProc;
+ this.retryProcessor = retryProcessor;
this.panicker = panicker;
- flushTimer = metrics.timer(name("tso", "persist", "flush"));
- batchSizeHistogram = metrics.histogram(name("tso", "persist", "batchsize"));
+ // Metrics in this component
+ flushTimer = metrics.timer(name("tso", "persist", "flush", "latency"));
+ flushedCommitEventsHistogram = metrics.histogram(name("tso", "persist", "flushed", "commits", "size"));
+ batchSizeHistogram = metrics.histogram(name("tso", "persist", "batch", "size"));
}
@Override
- public void onEvent(PersistenceProcessorImpl.PersistBatchEvent event) throws Exception {
+ public void onEvent(PersistenceProcessorImpl.PersistBatchEvent batchEvent) throws Exception {
- Batch batch = event.getBatch();
- for (int i=0; i < batch.getNumEvents(); ++i) {
- PersistEvent localEvent = batch.get(i);
+ int commitEventsToFlush = 0;
+ Batch batch = batchEvent.getBatch();
+ int numOfBatchedEvents = batch.getNumEvents();
+ batchSizeHistogram.update(numOfBatchedEvents);
+ for (int i=0; i < numOfBatchedEvents; ++i) {
+ PersistEvent event = batch.get(i);
- switch (localEvent.getType()) {
+ switch (event.getType()) {
case COMMIT:
- localEvent.getMonCtx().timerStart("commitPersistProcessor");
+ event.getMonCtx().timerStart("commitPersistProcessor");
// TODO: What happens when the IOException is thrown?
- writer.addCommittedTransaction(localEvent.getStartTimestamp(), localEvent.getCommitTimestamp());
+ writer.addCommittedTransaction(event.getStartTimestamp(), event.getCommitTimestamp());
+ commitEventsToFlush++;
break;
case ABORT:
break;
case TIMESTAMP:
- localEvent.getMonCtx().timerStart("timestampPersistProcessor");
+ event.getMonCtx().timerStart("timestampPersistProcessor");
break;
- default:
- throw new RuntimeException("Unknown event type: " + localEvent.getType().name());
}
}
- if (batch.getNumEvents() > 0) {
- flush(batch.getNumEvents());
- sendReplies(batch, event.getBatchSequence());
- }
+
+ // Flush and send the responses back to the client. WARNING: Before sending the responses, first we need
+ // to filter commit retries in the batch to disambiguate them.
+ flush(commitEventsToFlush);
+ filterAndDissambiguateClientRetries(batch);
+ replyProcessor.manageResponsesBatch(batchEvent.getBatchSequence(), batch);
+
}
- private void flush(int numBatchedEvents) {
+ void flush(int commitEventsToFlush) {
- commitSuicideIfNotMaster();
- try {
- long startFlushTimeInNs = System.nanoTime();
+ commitSuicideIfNotMaster();
+ try {
+ long startFlushTimeInNs = System.nanoTime();
+ if(commitEventsToFlush > 0) {
writer.flush();
- flushTimer.update(System.nanoTime() - startFlushTimeInNs);
- batchSizeHistogram.update(numBatchedEvents);
- } catch (IOException e) {
- panicker.panic("Error persisting commit batch", e);
}
- commitSuicideIfNotMaster(); // TODO Here, we can return the client responses before committing suicide
+ flushTimer.update(System.nanoTime() - startFlushTimeInNs);
+ flushedCommitEventsHistogram.update(commitEventsToFlush);
+ } catch (IOException e) {
+ panicker.panic("Error persisting commit batch", e);
+ }
+ commitSuicideIfNotMaster();
}
@@ -117,30 +127,33 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
}
}
- private void sendReplies(Batch batch, long batchSequence) {
-
- int i = 0;
- while (i < batch.getNumEvents()) {
- PersistEvent e = batch.get(i);
- if (e.getType() == PersistEvent.Type.ABORT && e.isRetry()) {
- retryProc.disambiguateRetryRequestHeuristically(e.getStartTimestamp(), e.getChannel(), e.getMonCtx());
- PersistEvent tmp = batch.get(i);
- //TODO: why assign it?
- batch.set(i, batch.get(batch.getNumEvents() - 1));
- batch.set(batch.getNumEvents() - 1, tmp);
- if (batch.getNumEvents() == 1) {
- batch.clear();
- replyProcessor.manageResponsesBatch(batchSequence, null);
- return;
- }
+ void filterAndDissambiguateClientRetries(Batch batch) {
+
+ int currentEventIdx = 0;
+ while (currentEventIdx <= batch.getLastEventIdx()) {
+ PersistEvent event = batch.get(currentEventIdx);
+ if (event.isCommitRetry()) {
+ retryProcessor.disambiguateRetryRequestHeuristically(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
+ // Swap the disambiguated event with the last batch event & decrease the # of remaining elems to process
+ swapBatchElements(batch, currentEventIdx, batch.getLastEventIdx());
batch.decreaseNumEvents();
- continue;
+ if (batch.isEmpty()) {
+ break; // We're OK to call now the reply processor
+ } else {
+ continue; // Otherwise we continue checking for retries from the new event in the current position
+ }
+ } else {
+ currentEventIdx++; // Let's check if the next event was a retry
}
- i++;
}
- replyProcessor.manageResponsesBatch(batchSequence, batch);
+ }
+ private void swapBatchElements(Batch batch, int firstIdx, int lastIdx) {
+ PersistEvent tmpEvent = batch.get(firstIdx);
+ PersistEvent lastEventInBatch = batch.get(lastIdx);
+ batch.set(firstIdx, lastEventInBatch);
+ batch.set(lastIdx, tmpEvent);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index 761d30b..0da0c91 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -128,10 +128,10 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
}
@Override
- public void addAbortToBatch(long startTimestamp, boolean isRetry, Channel c, MonitoringContext context)
+ public void addAbortToBatch(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext context)
throws Exception {
- currentBatch.addAbort(startTimestamp, isRetry, c, context);
+ currentBatch.addAbort(startTimestamp, isCommitRetry, c, context);
if (currentBatch.isFull()) {
triggerCurrentBatchFlush();
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index c1709ef..91df214 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -90,42 +90,37 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
}
- private void handleReplyBatchEvent(ReplyBatchEvent event) throws Exception {
+ private void handleReplyBatchEvent(ReplyBatchEvent replyBatchEvent) throws Exception {
String name;
- Batch batch = event.getBatch();
- for (int i = 0; batch != null && i < batch.getNumEvents(); ++i) {
- PersistEvent localEvent = batch.get(i);
+ Batch batch = replyBatchEvent.getBatch();
+ for (int i = 0; i < batch.getNumEvents(); i++) {
+ PersistEvent event = batch.get(i);
- switch (localEvent.getType()) {
+ switch (event.getType()) {
case COMMIT:
name = "commitReplyProcessor";
- localEvent.getMonCtx().timerStart(name);
- sendCommitResponse(localEvent.getStartTimestamp(), localEvent.getCommitTimestamp(), localEvent.getChannel());
- localEvent.getMonCtx().timerStop(name);
- break;
+ event.getMonCtx().timerStart(name);
+ sendCommitResponse(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel());
+ event.getMonCtx().timerStop(name);
+ break;
case ABORT:
name = "abortReplyProcessor";
- localEvent.getMonCtx().timerStart(name);
- sendAbortResponse(localEvent.getStartTimestamp(), localEvent.getChannel());
- localEvent.getMonCtx().timerStop(name);
+ event.getMonCtx().timerStart(name);
+ sendAbortResponse(event.getStartTimestamp(), event.getChannel());
+ event.getMonCtx().timerStop(name);
break;
case TIMESTAMP:
name = "timestampReplyProcessor";
- localEvent.getMonCtx().timerStart(name);
- sendTimestampResponse(localEvent.getStartTimestamp(), localEvent.getChannel());
- localEvent.getMonCtx().timerStop(name);
- break;
- default:
- LOG.error("Unknown event {}", localEvent.getType());
+ event.getMonCtx().timerStart(name);
+ sendTimestampResponse(event.getStartTimestamp(), event.getChannel());
+ event.getMonCtx().timerStop(name);
break;
}
- localEvent.getMonCtx().publish();
+ event.getMonCtx().publish();
}
- if (batch != null) {
- batchPool.returnObject(batch);
- }
+ batchPool.returnObject(batch);
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
index 0ad5c08..372733e 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
@@ -38,7 +38,6 @@ import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestEvent>, RequestProcessor, TimeoutHandler {
@@ -163,7 +162,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
long startTimestamp = event.getStartTimestamp();
Iterable<Long> writeSet = event.writeSet();
- boolean isRetry = event.isRetry();
+ boolean isCommitRetry = event.isCommitRetry();
Channel c = event.getChannel();
boolean txCanCommit;
@@ -207,7 +206,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
persistProc.addCommitToBatch(startTimestamp, commitTimestamp, c, event.getMonCtx());
} else { // add it to the aborted list
- persistProc.addAbortToBatch(startTimestamp, isRetry, c, event.getMonCtx());
+ persistProc.addAbortToBatch(startTimestamp, isCommitRetry, c, event.getMonCtx());
}
}
@@ -221,7 +220,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
private Type type = null;
private Channel channel = null;
- private boolean isRetry = false;
+ private boolean isCommitRetry = false;
private long startTimestamp = 0;
private MonitoringContext monCtx;
private long numCells = 0;
@@ -246,7 +245,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
e.type = Type.COMMIT;
e.channel = c;
e.startTimestamp = startTimestamp;
- e.isRetry = isRetry;
+ e.isCommitRetry = isRetry;
if (writeSet.size() > MAX_INLINE) {
e.numCells = writeSet.size();
e.writeSetAsCollection = writeSet;
@@ -315,8 +314,8 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
}
- boolean isRetry() {
- return isRetry;
+ boolean isCommitRetry() {
+ return isCommitRetry;
}
final static EventFactory<RequestEvent> EVENT_FACTORY = new EventFactory<RequestEvent>() {
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/3659b96c/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
new file mode 100644
index 0000000..4dcc2b0
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.metrics.NullMetricsProvider;
+import org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent;
+import org.jboss.netty.channel.Channel;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+public class TestPersistenceProcessorHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestPersistenceProcessorHandler.class);
+
+ private static final int BATCH_ID = 0;
+ private static final int BATCH_SIZE = 6;
+ private static final long BATCH_SEQUENCE = 0;
+
+ private static final long FIRST_ST = 0L;
+ private static final long FIRST_CT = 1L;
+ private static final long SECOND_ST = 2L;
+ private static final long SECOND_CT = 3L;
+ private static final long THIRD_ST = 4L;
+ private static final long THIRD_CT = 5L;
+ private static final long FOURTH_ST = 6L;
+ private static final long FOURTH_CT = 7L;
+ private static final long FIFTH_ST = 8L;
+ private static final long FIFTH_CT = 9L;
+ private static final long SIXTH_ST = 10L;
+
+ @Mock
+ private CommitTable.Writer mockWriter;
+ @Mock
+ private CommitTable.Client mockClient;
+ @Mock
+ private LeaseManager leaseManager;
+ @Mock
+ private ReplyProcessor replyProcessor;
+ @Mock
+ private RetryProcessor retryProcessor;
+ @Mock
+ private Panicker panicker;
+
+ private CommitTable commitTable;
+
+ private MetricsRegistry metrics;
+
+ // Component under test
+ private PersistenceProcessorHandler persistenceHandler;
+
+ @BeforeMethod(alwaysRun = true, timeOut = 30_000)
+ public void initMocksAndComponents() throws Exception {
+
+ MockitoAnnotations.initMocks(this);
+
+ // Configure null metrics provider
+ metrics = new NullMetricsProvider();
+
+ // Configure commit table to return the mocked writer and client
+ commitTable = new CommitTable() {
+ @Override
+ public Writer getWriter() {
+ return mockWriter;
+ }
+
+ @Override
+ public Client getClient() {
+ return mockClient;
+ }
+ };
+
+ // Simulate we're master for most of the tests
+ doReturn(true).when(leaseManager).stillInLeasePeriod();
+
+ persistenceHandler = spy(new PersistenceProcessorHandler(metrics,
+ "localhost:1234",
+ leaseManager,
+ commitTable,
+ replyProcessor,
+ retryProcessor,
+ panicker));
+
+ }
+
+ @AfterMethod
+ void afterMethod() {
+ Mockito.reset(mockWriter);
+ }
+
+ @Test(timeOut = 10_000)
+ public void testProcessingOfEmptyBatchPersistEvent() throws Exception {
+
+ // Prepare test batch
+ Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+ PersistBatchEvent batchEvent = new PersistBatchEvent();
+ PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+ persistenceHandler.onEvent(batchEvent);
+
+ verify(persistenceHandler, times(1)).flush(eq(0));
+ verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
+ verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+ verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+ assertTrue(batch.isEmpty());
+
+ }
+
+ @Test(timeOut = 10_000)
+ public void testProcessingOfBatchPersistEventWithASingleTimestampEvent() throws Exception {
+
+ // Prepare test batch
+ Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+ batch.addTimestamp(FIRST_ST, null, mock(MonitoringContext.class));
+ PersistBatchEvent batchEvent = new PersistBatchEvent();
+ PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+ persistenceHandler.onEvent(batchEvent);
+
+ verify(persistenceHandler, times(1)).flush(eq(0));
+ verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
+ verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+ verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+ assertEquals(batch.getNumEvents(), 1);
+ assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
+
+ }
+
+ @Test(timeOut = 10_000)
+ public void testProcessingOfBatchPersistEventWithASingleCommitEvent() throws Exception {
+
+ // Prepare test batch
+ Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+ batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+ PersistBatchEvent batchEvent = new PersistBatchEvent();
+ PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+ persistenceHandler.onEvent(batchEvent);
+
+ verify(persistenceHandler, times(1)).flush(eq(1));
+ verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
+ verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+ verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+ assertEquals(batch.getNumEvents(), 1);
+ assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
+ assertEquals(batch.get(0).getCommitTimestamp(), FIRST_CT);
+
+ }
+
+ @Test(timeOut = 10_000)
+ public void testProcessingOfBatchPersistEventWithASingleAbortEventNoRetry() throws Exception {
+
+ final boolean IS_RETRY = false;
+
+ // Prepare test batch
+ Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+ batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+ PersistBatchEvent batchEvent = new PersistBatchEvent();
+ PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+ persistenceHandler.onEvent(batchEvent);
+
+ verify(persistenceHandler, times(1)).flush(eq(0));
+ verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
+ verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+ verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+ assertEquals(batch.getNumEvents(), 1);
+ assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
+
+ }
+
+ @Test(timeOut = 10_000)
+ public void testProcessingOfBatchPersistEventWithASingleAbortEventWithRetry() throws Exception {
+
+ final boolean IS_RETRY = true;
+
+ // Prepare test batch
+ Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+ batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+ PersistBatchEvent batchEvent = new PersistBatchEvent();
+ PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+ // Call process method
+ persistenceHandler.onEvent(batchEvent);
+
+ verify(persistenceHandler, times(1)).flush(eq(0));
+ verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(batch);
+ verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class));
+ verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+ assertEquals(batch.getNumEvents(), 0);
+
+ }
+
+ @Test(timeOut = 10_000)
+ public void testProcessingOfBatchPersistEventWith2EventsCommitAndAbortWithRetry() throws Exception {
+
+ final boolean IS_RETRY = true;
+
+ // Prepare test batch
+ Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+ batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+ batch.addAbort(SECOND_ST, IS_RETRY, null, mock(MonitoringContext.class));
+ PersistBatchEvent batchEvent = new PersistBatchEvent();
+ PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+ // Initial assertion
+ assertEquals(batch.getNumEvents(), 2);
+
+ // Call process method
+ persistenceHandler.onEvent(batchEvent);
+
+ verify(persistenceHandler, times(1)).flush(eq(1));
+ verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
+ verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class));
+ verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+ assertEquals(batch.getNumEvents(), 1);
+ assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
+ assertEquals(batch.get(0).getCommitTimestamp(), FIRST_CT);
+
+ }
+
+ @Test(timeOut = 10_000)
+ public void testProcessingOfBatchPersistEventWith2EventsAbortWithRetryAndCommit() throws Exception {
+ // ------------------------------------------------------------------------------------------------------------
+ // Same test as testProcessingOfBatchPersistEventWith2EventsCommitAndAbortWithRetry but swapped events
+ // ------------------------------------------------------------------------------------------------------------
+
+ final boolean IS_RETRY = true;
+
+ // Prepare test batch
+ Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+ batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+ batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContext.class));
+ PersistBatchEvent batchEvent = new PersistBatchEvent();
+ PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+ // Initial assertion
+ assertEquals(batch.getNumEvents(), 2);
+
+ // Call process method
+ persistenceHandler.onEvent(batchEvent);
+
+ verify(persistenceHandler, times(1)).flush(eq(1));
+ verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
+ verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class));
+ verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+ assertEquals(batch.getNumEvents(), 1);
+ assertEquals(batch.get(0).getStartTimestamp(), SECOND_ST);
+ assertEquals(batch.get(0).getCommitTimestamp(), SECOND_CT);
+
+ }
+
+ @Test(timeOut = 10_000)
+ public void testProcessingOfBatchPersistEventWith2AbortWithRetryEvents() throws Exception {
+
+ final boolean IS_RETRY = true;
+
+ // Prepare test batch
+ Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+ batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+ batch.addAbort(SECOND_ST, IS_RETRY, null, mock(MonitoringContext.class));
+ PersistBatchEvent batchEvent = new PersistBatchEvent();
+ PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+ // Initial assertion
+ assertEquals(batch.getNumEvents(), 2);
+
+ // Call process method
+ persistenceHandler.onEvent(batchEvent);
+
+ verify(persistenceHandler, times(1)).flush(eq(0));
+ verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
+ verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(FIRST_ST), any(Channel.class), any(MonitoringContext.class));
+ verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class));
+ verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+ assertEquals(batch.getNumEvents(), 0);
+
+ }
+
+ @Test(timeOut = 10_000)
+ public void testProcessingOfBatchPersistEventWith2NonRetryAbortEvents() throws Exception {
+
+ final boolean IS_RETRY = false;
+
+ // Prepare test batch
+ Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+ batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+ batch.addAbort(SECOND_ST, IS_RETRY, null, mock(MonitoringContext.class));
+ PersistBatchEvent batchEvent = new PersistBatchEvent();
+ PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+ // Initial assertion
+ assertEquals(batch.getNumEvents(), 2);
+
+ // Call process method
+ persistenceHandler.onEvent(batchEvent);
+
+ verify(persistenceHandler, times(1)).flush(eq(0));
+ verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
+ verify(retryProcessor, never()).disambiguateRetryRequestHeuristically(anyLong(), any(Channel.class), any(MonitoringContext.class));
+ verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+ assertEquals(batch.getNumEvents(), 2);
+ assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
+ assertEquals(batch.get(1).getStartTimestamp(), SECOND_ST);
+
+ }
+
+
+ @Test(timeOut = 10_000)
+ public void testProcessingOfBatchPersistEventWithMultipleRetryAndNonRetryEvents() throws Exception {
+
+ // Prepare test batch
+ Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+
+ batch.addTimestamp(FIRST_ST, null, mock(MonitoringContext.class));
+ batch.addAbort(SECOND_ST, true, null, mock(MonitoringContext.class));
+ batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContext.class));
+ batch.addAbort(FOURTH_ST, false, null, mock(MonitoringContext.class));
+ batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContext.class));
+ batch.addAbort(SIXTH_ST, true, null, mock(MonitoringContext.class));
+ PersistBatchEvent batchEvent = new PersistBatchEvent();
+ PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+ // Initial assertion
+ assertEquals(batch.getNumEvents(), 6);
+
+ // Call process method
+ persistenceHandler.onEvent(batchEvent);
+
+ verify(persistenceHandler, times(1)).flush(2); // 2 commits to flush
+ verify(persistenceHandler, times(1)).filterAndDissambiguateClientRetries(eq(batch));
+ verify(retryProcessor, times(1)).disambiguateRetryRequestHeuristically(eq(SECOND_ST), any(Channel.class), any(MonitoringContext.class));
+ verify(replyProcessor, times(1)).manageResponsesBatch(eq(BATCH_SEQUENCE), eq(batch));
+ assertEquals(batch.getNumEvents(), 4);
+ assertEquals(batch.get(0).getStartTimestamp(), FIRST_ST);
+ assertEquals(batch.get(1).getStartTimestamp(), FIFTH_ST);
+ assertEquals(batch.get(1).getCommitTimestamp(), FIFTH_CT);
+ assertEquals(batch.get(2).getStartTimestamp(), THIRD_ST);
+ assertEquals(batch.get(2).getCommitTimestamp(), THIRD_CT);
+ assertEquals(batch.get(3).getStartTimestamp(), FOURTH_ST);
+
+ }
+
+ @Test(timeOut = 10_000)
+ public void testPanicPersistingEvents() throws Exception {
+
+ // User the real panicker
+ Panicker panicker = spy(new RuntimeExceptionPanicker());
+ persistenceHandler = spy(new PersistenceProcessorHandler(metrics,
+ "localhost:1234",
+ leaseManager,
+ commitTable,
+ replyProcessor,
+ retryProcessor,
+ panicker));
+
+ // Prepare test batch
+ Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+ batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+ PersistBatchEvent batchEvent = new PersistBatchEvent();
+ PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+ doThrow(IOException.class).when(mockWriter).flush();
+
+ try {
+ persistenceHandler.onEvent(batchEvent);
+ fail();
+ } catch (RuntimeException re) {
+ // Expected
+ }
+
+ verify(persistenceHandler, times(1)).flush(1);
+ verify(panicker, times(1)).panic(eq("Error persisting commit batch"), any(IOException.class));
+ verify(persistenceHandler, never()).filterAndDissambiguateClientRetries(any(Batch.class));
+ verify(replyProcessor, never()).manageResponsesBatch(anyLong(), any(Batch.class));
+
+ }
+
+ @Test(timeOut = 10_000)
+ public void testPanicBecauseMasterLosesMastership() throws Exception {
+
+ // ------------------------------------------------------------------------------------------------------------
+ // 1) Test panic before flushing
+ // ------------------------------------------------------------------------------------------------------------
+
+ // Simulate we lose mastership BEFORE flushing
+ doReturn(false).when(leaseManager).stillInLeasePeriod();
+
+ // User the real panicker
+ Panicker panicker = spy(new RuntimeExceptionPanicker());
+ persistenceHandler = spy(new PersistenceProcessorHandler(metrics,
+ "localhost:1234",
+ leaseManager,
+ commitTable,
+ replyProcessor,
+ retryProcessor,
+ panicker));
+
+ // Prepare test batch
+ Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
+ batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+ PersistBatchEvent batchEvent = new PersistBatchEvent();
+ PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+ try {
+ persistenceHandler.onEvent(batchEvent);
+ fail();
+ } catch (RuntimeException re) {
+ // Expected
+ }
+ verify(persistenceHandler, times(1)).flush(eq(1));
+ verify(mockWriter, never()).flush();
+ verify(panicker, times(1)).panic(eq("Replica localhost:1234 lost mastership whilst flushing data. Committing suicide"), any(IOException.class));
+ verify(persistenceHandler, never()).filterAndDissambiguateClientRetries(any(Batch.class));
+ verify(replyProcessor, never()).manageResponsesBatch(anyLong(), any(Batch.class));
+
+ // ------------------------------------------------------------------------------------------------------------
+ // 2) Test panic after flushing
+ // ------------------------------------------------------------------------------------------------------------
+
+ // Simulate we lose mastership AFTER flushing
+ doReturn(true).doReturn(false).when(leaseManager).stillInLeasePeriod();
+
+ // User the real panicker
+ panicker = spy(new RuntimeExceptionPanicker());
+ persistenceHandler = spy(new PersistenceProcessorHandler(metrics,
+ "localhost:1234",
+ leaseManager,
+ commitTable,
+ replyProcessor,
+ retryProcessor,
+ panicker));
+
+ // Prepare test batch
+ batch = new Batch(BATCH_ID, BATCH_SIZE);
+ batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
+ batchEvent = new PersistBatchEvent();
+ PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
+
+ try {
+ persistenceHandler.onEvent(batchEvent);
+ fail();
+ } catch (RuntimeException re) {
+ // Expected
+ }
+ verify(persistenceHandler, times(1)).flush(eq(1));
+ verify(mockWriter, times(1)).flush();
+ verify(panicker, times(1)).panic(eq("Replica localhost:1234 lost mastership whilst flushing data. Committing suicide"), any(IOException.class));
+ verify(persistenceHandler, never()).filterAndDissambiguateClientRetries(any(Batch.class));
+ verify(replyProcessor, never()).manageResponsesBatch(anyLong(), any(Batch.class));
+
+ }
+
+}
[6/7] incubator-omid git commit: Separate commit retry events from
aborts in batch contents
Posted by fp...@apache.org.
Separate commit retry events from aborts in batch contents
Before, the Abort events were re-used for holding what in reality are
commit retries, what made the code criptic in many places. Now commit
retries are treated as first class events.
Change-Id: Ib9c9f424c3b7b94a02d5e985b388ef74f13c87da
Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/8954a342
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/8954a342
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/8954a342
Branch: refs/heads/master
Commit: 8954a3421803b70aa25de2d8ee574ca4f3ed05e4
Parents: 3659b96
Author: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Authored: Mon May 16 15:07:59 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Wed May 18 12:26:31 2016 -0700
----------------------------------------------------------------------
.../main/java/org/apache/omid/tso/Batch.java | 13 +++++-
.../java/org/apache/omid/tso/PersistEvent.java | 30 +++++--------
.../apache/omid/tso/PersistenceProcessor.java | 5 ++-
.../omid/tso/PersistenceProcessorHandler.java | 3 +-
.../omid/tso/PersistenceProcessorImpl.java | 12 ++++-
.../org/apache/omid/tso/ReplyProcessorImpl.java | 12 +++--
.../apache/omid/tso/RequestProcessorImpl.java | 10 ++++-
.../java/org/apache/omid/tso/TestBatch.java | 16 ++++---
.../tso/TestPersistenceProcessorHandler.java | 46 ++++++++------------
.../apache/omid/tso/TestRequestProcessor.java | 6 +--
10 files changed, 84 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/Batch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/Batch.java b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
index 06ddc71..496bca2 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/Batch.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
@@ -101,11 +101,20 @@ public class Batch {
}
- void addAbort(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext context) {
+ public void addCommitRetry(long startTimestamp, Channel c, MonitoringContext context) {
+
+ Preconditions.checkState(!isFull(), "batch is full");
+ int index = numEvents++;
+ PersistEvent e = events[index];
+ e.makeCommitRetry(startTimestamp, c, context);
+
+ }
+
+ void addAbort(long startTimestamp, Channel c, MonitoringContext context) {
Preconditions.checkState(!isFull(), "batch is full");
int index = numEvents++;
PersistEvent e = events[index];
- e.makePersistAbort(startTimestamp, isCommitRetry, c, context);
+ e.makePersistAbort(startTimestamp, c, context);
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
index 9bf9e89..db58677 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
@@ -25,16 +25,14 @@ public final class PersistEvent {
private MonitoringContext monCtx;
enum Type {
- TIMESTAMP, COMMIT, ABORT
+ TIMESTAMP, COMMIT, ABORT, COMMIT_RETRY
}
private Type type = null;
private Channel channel = null;
- private boolean isCommitRetry = false;
private long startTimestamp = 0L;
private long commitTimestamp = 0L;
- private long lowWatermark = 0L;
void makePersistCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
@@ -46,11 +44,19 @@ public final class PersistEvent {
}
- void makePersistAbort(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext monCtx) {
+ void makeCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) {
+
+ this.type = Type.COMMIT_RETRY;
+ this.startTimestamp = startTimestamp;
+ this.channel = c;
+ this.monCtx = monCtx;
+
+ }
+
+ void makePersistAbort(long startTimestamp, Channel c, MonitoringContext monCtx) {
this.type = Type.ABORT;
this.startTimestamp = startTimestamp;
- this.isCommitRetry = isCommitRetry;
this.channel = c;
this.monCtx = monCtx;
@@ -83,12 +89,6 @@ public final class PersistEvent {
}
- boolean isCommitRetry() {
-
- return isCommitRetry;
-
- }
-
long getStartTimestamp() {
return startTimestamp;
@@ -101,20 +101,12 @@ public final class PersistEvent {
}
- long getLowWatermark() {
-
- return lowWatermark;
-
- }
-
@Override
public String toString() {
return Objects.toStringHelper(this)
.add("type", type)
- .add("isCommitRetry", isCommitRetry)
.add("ST", startTimestamp)
.add("CT", commitTimestamp)
- .add("LWM", lowWatermark)
.toString();
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
index 0b1a34f..07893f6 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
@@ -26,12 +26,13 @@ interface PersistenceProcessor {
void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
throws Exception;
- void addAbortToBatch(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext monCtx) throws Exception;
+ void addCommitRetryToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
+
+ void addAbortToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
void triggerCurrentBatchFlush() throws Exception;
Future<Void> persistLowWatermark(long lowWatermark);
-
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
index 079c8c4..bfe0036 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
@@ -30,6 +30,7 @@ import java.io.IOException;
import java.util.concurrent.ExecutionException;
import static com.codahale.metrics.MetricRegistry.name;
+import static org.apache.omid.tso.PersistEvent.Type.*;
public class PersistenceProcessorHandler implements WorkHandler<PersistenceProcessorImpl.PersistBatchEvent> {
@@ -132,7 +133,7 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
int currentEventIdx = 0;
while (currentEventIdx <= batch.getLastEventIdx()) {
PersistEvent event = batch.get(currentEventIdx);
- if (event.isCommitRetry()) {
+ if (event.getType() == COMMIT_RETRY) {
retryProcessor.disambiguateRetryRequestHeuristically(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
// Swap the disambiguated event with the last batch event & decrease the # of remaining elems to process
swapBatchElements(batch, currentEventIdx, batch.getLastEventIdx());
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index 0da0c91..4fe21d2 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -128,10 +128,18 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
}
@Override
- public void addAbortToBatch(long startTimestamp, boolean isCommitRetry, Channel c, MonitoringContext context)
+ public void addCommitRetryToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+ currentBatch.addCommitRetry(startTimestamp, c, monCtx);
+ if (currentBatch.isFull()) {
+ triggerCurrentBatchFlush();
+ }
+ }
+
+ @Override
+ public void addAbortToBatch(long startTimestamp, Channel c, MonitoringContext context)
throws Exception {
- currentBatch.addAbort(startTimestamp, isCommitRetry, c, context);
+ currentBatch.addAbort(startTimestamp, c, context);
if (currentBatch.isFull()) {
triggerCurrentBatchFlush();
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index 91df214..a46c240 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.omid.tso;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.lmax.disruptor.BatchEventProcessor;
@@ -50,9 +51,11 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
private final RingBuffer<ReplyBatchEvent> replyRing;
- private AtomicLong nextIDToHandle = new AtomicLong();
+ @VisibleForTesting
+ AtomicLong nextIDToHandle = new AtomicLong();
- private PriorityQueue<ReplyBatchEvent> futureEvents;
+ @VisibleForTesting
+ PriorityQueue<ReplyBatchEvent> futureEvents;
// Metrics
private final Meter abortMeter;
@@ -90,7 +93,8 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
}
- private void handleReplyBatchEvent(ReplyBatchEvent replyBatchEvent) throws Exception {
+ @VisibleForTesting
+ void handleReplyBatchEvent(ReplyBatchEvent replyBatchEvent) throws Exception {
String name;
Batch batch = replyBatchEvent.getBatch();
@@ -116,6 +120,8 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
sendTimestampResponse(event.getStartTimestamp(), event.getChannel());
event.getMonCtx().timerStop(name);
break;
+ case COMMIT_RETRY:
+ throw new RuntimeException("COMMIT_RETRY events must be filtered before this step. Event: {}");
}
event.getMonCtx().publish();
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
index 372733e..32f5b8c 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
@@ -205,8 +205,14 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
}
persistProc.addCommitToBatch(startTimestamp, commitTimestamp, c, event.getMonCtx());
- } else { // add it to the aborted list
- persistProc.addAbortToBatch(startTimestamp, isCommitRetry, c, event.getMonCtx());
+ } else {
+
+ if (isCommitRetry) { // Re-check if it was already committed but the client retried due to a lag replying
+ persistProc.addCommitRetryToBatch(startTimestamp, c, event.getMonCtx());
+ } else {
+ persistProc.addAbortToBatch(startTimestamp, c, event.getMonCtx());
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
index c472606..448eeaa 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
@@ -70,12 +70,14 @@ public class TestBatch {
// Test when filling the batch with different types of events, that becomes full
for (int i = 0; i < BATCH_SIZE; i++) {
- if (i % 3 == 0) {
+ if (i % 4 == 0) {
batch.addTimestamp(ANY_ST, channel, monCtx);
- } else if (i % 3 == 1) {
+ } else if (i % 4 == 1) {
batch.addCommit(ANY_ST, ANY_CT, channel, monCtx);
+ } else if (i % 4 == 2) {
+ batch.addCommitRetry(ANY_ST, channel, monCtx);
} else {
- batch.addAbort(ANY_ST, false, channel, monCtx);
+ batch.addAbort(ANY_ST, channel, monCtx);
}
}
assertFalse(batch.isEmpty(), "Batch should contain elements");
@@ -95,7 +97,8 @@ public class TestBatch {
// Check the first 3 events and the last one correspond to the filling done above
assertTrue(batch.get(0).getType().equals(PersistEvent.Type.TIMESTAMP));
assertTrue(batch.get(1).getType().equals(PersistEvent.Type.COMMIT));
- assertTrue(batch.get(2).getType().equals(PersistEvent.Type.ABORT));
+ assertTrue(batch.get(2).getType().equals(PersistEvent.Type.COMMIT_RETRY));
+ assertTrue(batch.get(3).getType().equals(PersistEvent.Type.ABORT));
// Set a new value for last element in Batch and check we obtain the right result
batch.decreaseNumEvents();
@@ -137,10 +140,11 @@ public class TestBatch {
// Put some elements in the batch...
batch.addTimestamp(ANY_ST, channel, monCtx);
batch.addCommit(ANY_ST, ANY_CT, channel, monCtx);
- batch.addAbort(ANY_ST, false, channel, monCtx);
+ batch.addCommitRetry(ANY_ST, channel, monCtx);
+ batch.addAbort(ANY_ST, channel, monCtx);
assertFalse(batch.isEmpty(), "Batch should contain elements");
assertFalse(batch.isFull(), "Batch should NOT be full");
- assertEquals(batch.getNumEvents(), 3, "Num events should be 3");
+ assertEquals(batch.getNumEvents(), 4, "Num events should be 4");
// ... and passivate the object through the factory. It should reset the state of the batch
factory.passivateObject(pooledBatch);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
index 4dcc2b0..d60d019 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
@@ -185,11 +185,9 @@ public class TestPersistenceProcessorHandler {
@Test(timeOut = 10_000)
public void testProcessingOfBatchPersistEventWithASingleAbortEventNoRetry() throws Exception {
- final boolean IS_RETRY = false;
-
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+ batch.addAbort(FIRST_ST, null, mock(MonitoringContext.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
persistenceHandler.onEvent(batchEvent);
@@ -204,13 +202,11 @@ public class TestPersistenceProcessorHandler {
}
@Test(timeOut = 10_000)
- public void testProcessingOfBatchPersistEventWithASingleAbortEventWithRetry() throws Exception {
-
- final boolean IS_RETRY = true;
+ public void testProcessingOfBatchPersistEventWithASingleCommitRetryEvent() throws Exception {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+ batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -226,14 +222,12 @@ public class TestPersistenceProcessorHandler {
}
@Test(timeOut = 10_000)
- public void testProcessingOfBatchPersistEventWith2EventsCommitAndAbortWithRetry() throws Exception {
-
- final boolean IS_RETRY = true;
+ public void testProcessingOfBatchPersistEventWith2EventsCommitAndCommitRetry() throws Exception {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContext.class));
- batch.addAbort(SECOND_ST, IS_RETRY, null, mock(MonitoringContext.class));
+ batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -254,16 +248,14 @@ public class TestPersistenceProcessorHandler {
}
@Test(timeOut = 10_000)
- public void testProcessingOfBatchPersistEventWith2EventsAbortWithRetryAndCommit() throws Exception {
+ public void testProcessingOfBatchPersistEventWith2EventsCommitRetryAndCommit() throws Exception {
// ------------------------------------------------------------------------------------------------------------
- // Same test as testProcessingOfBatchPersistEventWith2EventsCommitAndAbortWithRetry but swapped events
+ // Same test as testProcessingOfBatchPersistEventWith2EventsCommitAndCommitRetry but swapped events
// ------------------------------------------------------------------------------------------------------------
- final boolean IS_RETRY = true;
-
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
+ batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContext.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -285,14 +277,12 @@ public class TestPersistenceProcessorHandler {
}
@Test(timeOut = 10_000)
- public void testProcessingOfBatchPersistEventWith2AbortWithRetryEvents() throws Exception {
-
- final boolean IS_RETRY = true;
+ public void testProcessingOfBatchPersistEventWith2CommitRetryEvents() throws Exception {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
- batch.addAbort(SECOND_ST, IS_RETRY, null, mock(MonitoringContext.class));
+ batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContext.class));
+ batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -312,14 +302,12 @@ public class TestPersistenceProcessorHandler {
}
@Test(timeOut = 10_000)
- public void testProcessingOfBatchPersistEventWith2NonRetryAbortEvents() throws Exception {
-
- final boolean IS_RETRY = false;
+ public void testProcessingOfBatchPersistEventWith2AbortEvents() throws Exception {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addAbort(FIRST_ST, IS_RETRY, null, mock(MonitoringContext.class));
- batch.addAbort(SECOND_ST, IS_RETRY, null, mock(MonitoringContext.class));
+ batch.addAbort(FIRST_ST, null, mock(MonitoringContext.class));
+ batch.addAbort(SECOND_ST, null, mock(MonitoringContext.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -347,11 +335,11 @@ public class TestPersistenceProcessorHandler {
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
batch.addTimestamp(FIRST_ST, null, mock(MonitoringContext.class));
- batch.addAbort(SECOND_ST, true, null, mock(MonitoringContext.class));
+ batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContext.class));
batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContext.class));
- batch.addAbort(FOURTH_ST, false, null, mock(MonitoringContext.class));
+ batch.addAbort(FOURTH_ST, null, mock(MonitoringContext.class));
batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContext.class));
- batch.addAbort(SIXTH_ST, true, null, mock(MonitoringContext.class));
+ batch.addCommitRetry(SIXTH_ST, null, mock(MonitoringContext.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/8954a342/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
index 5b17d1e..8f2b06e 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
@@ -112,7 +112,7 @@ public class TestRequestProcessor {
List<Long> writeSet = Lists.newArrayList(1L, 20L, 203L);
requestProc.commitRequest(firstTS - 1, writeSet, false, null, new MonitoringContext(metrics));
- verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - 1), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+ verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - 1), any(Channel.class), any(MonitoringContext.class));
requestProc.commitRequest(firstTS, writeSet, false, null, new MonitoringContext(metrics));
ArgumentCaptor<Long> commitTScapture = ArgumentCaptor.forClass(Long.class);
@@ -136,7 +136,7 @@ public class TestRequestProcessor {
requestProc.commitRequest(thirdTS, writeSet, false, null, new MonitoringContext(metrics));
verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContext.class));
requestProc.commitRequest(secondTS, writeSet, false, null, new MonitoringContext(metrics));
- verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+ verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), any(Channel.class), any(MonitoringContext.class));
}
@@ -159,7 +159,7 @@ public class TestRequestProcessor {
// ...check that the transaction is aborted when trying to commit
requestProc.commitRequest(startTS, writeSet, false, null, new MonitoringContext(metrics));
- verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+ verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), any(Channel.class), any(MonitoringContext.class));
}
[2/7] incubator-omid git commit: [ci skip] Add NOTICE
Posted by fp...@apache.org.
[ci skip] Add NOTICE
Change-Id: I386f5fa20d2578dc87a9b1eb93441c9cea4eb06b
Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/fb63cf39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/fb63cf39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/fb63cf39
Branch: refs/heads/master
Commit: fb63cf3940c216f6eb175f45e067be5f64c9097f
Parents: fcc56bb
Author: Igor Katkov <ka...@yahoo-inc.com>
Authored: Wed May 18 11:45:04 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Wed May 18 12:26:31 2016 -0700
----------------------------------------------------------------------
NOTICE | 23 +++++++++++++++++++++++
1 file changed, 23 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/fb63cf39/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
new file mode 100644
index 0000000..7e71dbc
--- /dev/null
+++ b/NOTICE
@@ -0,0 +1,23 @@
+Apache Omid
+Copyright 2016 The Apache Software Foundation.
+
+This product includes software developed at The Apache Software Foundation (http://www.apache.org/).
+
+This distribution has a binary dependency on:
+
+Google Guava v11.0.2 (https://github.com/google/guava) [Apache 2.0]
+Guice v3.0 (https://github.com/google/guice/wiki) [Apache 2.0]
+Testng v6.8.8 (http://testng.org) [Apache 2.0]
+SLF4J (http://www.slf4j.org/) v1.7.7 [MIT License]
+Netty (http://netty.io) v3.2.6.Final [Apache 2.0]
+Google Protocol Buffers v2.5.0 (https://developers.google.com/protocol-buffers/) [BSD License]
+Mockito (http://mockito.org/) v1.9.5 [MIT License]
+LMAX Disruptor v3.2.0 (https://lmax-exchange.github.io/disruptor/) [Apache 2.0]
+Coda Hale/Yammer.com Dropwizard Metrics v3.0.1 (http://metrics.dropwizard.io/3.1.0/) [Apache 2.0]
+JCommander v1.35 (http://jcommander.org/) [Apache 2.0]
+Hamcrest v1.3 (http://hamcrest.org/JavaHamcrest/) [BSD License]
+Snakeyml v1.11 (https://bitbucket.org/asomov/snakeyaml) [Apache 2.0]
+Google FindBugs v3.0.1 (http://findbugs.sourceforge.net/) [Lesser GNU General Public License]
+
+
+