You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by ik...@apache.org on 2016/04/21 02:02:33 UTC

[16/52] [abbrv] incubator-omid git commit: Move com.yahoo -> org.apache

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
new file mode 100644
index 0000000..34ced79
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.metrics.NullMetricsProvider;
+import org.jboss.netty.channel.Channel;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+
+public class TestBatch {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestBatch.class);
+
+    private static final int BATCH_SIZE = 1000;
+    private MetricsRegistry metrics = new NullMetricsProvider();
+
+    @Mock
+    private Channel channel;
+    @Mock
+    private RetryProcessor retryProcessor;
+    @Mock
+    private ReplyProcessor replyProcessor;
+
+    // The batch element to test
+    private PersistenceProcessorImpl.Batch batch;
+
+    @BeforeMethod(alwaysRun = true, timeOut = 30_000)
+    public void initMocksAndComponents() {
+        MockitoAnnotations.initMocks(this);
+        batch = new PersistenceProcessorImpl.Batch(BATCH_SIZE);
+    }
+
+    @Test
+    public void testBatchFunctionality() {
+
+        // Required mocks
+        Channel channel = Mockito.mock(Channel.class);
+        ReplyProcessor replyProcessor = Mockito.mock(ReplyProcessor.class);
+        RetryProcessor retryProcessor = Mockito.mock(RetryProcessor.class);
+
+        // The batch element to test
+        PersistenceProcessorImpl.Batch batch = new PersistenceProcessorImpl.Batch(BATCH_SIZE);
+
+        // Test initial state is OK
+        AssertJUnit.assertFalse("Batch shouldn't be full", batch.isFull());
+        AssertJUnit.assertEquals("Num events should be 0", 0, batch.getNumEvents());
+
+        // Test adding a single commit event is OK
+        MonitoringContext monCtx = new MonitoringContext(metrics);
+        monCtx.timerStart("commitPersistProcessor");
+        batch.addCommit(0, 1, channel, monCtx);
+        AssertJUnit.assertFalse("Batch shouldn't be full", batch.isFull());
+        AssertJUnit.assertEquals("Num events should be 1", 1, batch.getNumEvents());
+
+        // Test when filling the batch with events, batch is full
+        for (int i = 0; i < (BATCH_SIZE - 1); i++) {
+            if (i % 2 == 0) {
+                monCtx = new MonitoringContext(metrics);
+                monCtx.timerStart("timestampPersistProcessor");
+                batch.addTimestamp(i, channel, monCtx);
+            } else {
+                monCtx = new MonitoringContext(metrics);
+                monCtx.timerStart("commitPersistProcessor");
+                batch.addCommit(i, i + 1, channel, monCtx);
+            }
+        }
+        AssertJUnit.assertTrue("Batch should be full", batch.isFull());
+        AssertJUnit.assertEquals("Num events should be " + BATCH_SIZE, BATCH_SIZE, batch.getNumEvents());
+
+        // Test an exception is thrown when batch is full and a new element is going to be added
+        try {
+            monCtx = new MonitoringContext(metrics);
+            monCtx.timerStart("commitPersistProcessor");
+            batch.addCommit(0, 1, channel, new MonitoringContext(metrics));
+            Assert.fail("Should throw an IllegalStateException");
+        } catch (IllegalStateException e) {
+            AssertJUnit.assertEquals("message returned doesn't match", "batch full", e.getMessage());
+            LOG.debug("IllegalStateException catched properly");
+        }
+
+        // Test that sending replies empties the batch
+        final boolean MASTER_INSTANCE = true;
+        final boolean SHOULD_MAKE_HEURISTIC_DECISSION = true;
+        batch.sendRepliesAndReset(replyProcessor, retryProcessor, MASTER_INSTANCE);
+        verify(replyProcessor, timeout(100).times(BATCH_SIZE / 2))
+                .timestampResponse(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProcessor, timeout(100).times(BATCH_SIZE / 2))
+                .commitResponse(eq(!SHOULD_MAKE_HEURISTIC_DECISSION), anyLong(), anyLong(),
+                        any(Channel.class), any(MonitoringContext.class));
+        AssertJUnit.assertFalse("Batch shouldn't be full", batch.isFull());
+        AssertJUnit.assertEquals("Num events should be 0", 0, batch.getNumEvents());
+
+    }
+
+    @Test
+    public void testBatchFunctionalityWhenMastershipIsLost() {
+        Channel channel = Mockito.mock(Channel.class);
+
+        // Fill the batch with events till full
+        for (int i = 0; i < BATCH_SIZE; i++) {
+            if (i % 2 == 0) {
+                MonitoringContext monCtx = new MonitoringContext(metrics);
+                monCtx.timerStart("timestampPersistProcessor");
+                batch.addTimestamp(i, channel, monCtx);
+            } else {
+                MonitoringContext monCtx = new MonitoringContext(metrics);
+                monCtx.timerStart("commitPersistProcessor");
+                batch.addCommit(i, i + 1, channel, monCtx);
+            }
+        }
+
+        // Test that sending replies empties the batch also when the replica
+        // is NOT master and calls the ambiguousCommitResponse() method on the
+        // reply processor
+        final boolean MASTER_INSTANCE = true;
+        final boolean SHOULD_MAKE_HEURISTIC_DECISSION = true;
+        batch.sendRepliesAndReset(replyProcessor, retryProcessor, !MASTER_INSTANCE);
+        verify(replyProcessor, timeout(100).times(BATCH_SIZE / 2))
+                .timestampResponse(anyLong(), any(Channel.class), any(MonitoringContext.class));
+        verify(replyProcessor, timeout(100).times(BATCH_SIZE / 2))
+                .commitResponse(eq(SHOULD_MAKE_HEURISTIC_DECISSION), anyLong(), anyLong(), any(Channel.class), any(
+                        MonitoringContext.class));
+        assertFalse(batch.isFull(), "Batch shouldn't be full");
+        assertEquals(batch.getNumEvents(), 0, "Num events should be 0");
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/TestLeaseManager.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestLeaseManager.java b/tso-server/src/test/java/org/apache/omid/tso/TestLeaseManager.java
new file mode 100644
index 0000000..2eb4244
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestLeaseManager.java
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.google.common.base.Charsets;
+import org.apache.omid.TestUtils;
+import org.apache.omid.tso.TSOStateManager.TSOState;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.CloseableUtils;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+
+import static org.apache.omid.tso.client.TSOClient.DEFAULT_ZK_CLUSTER;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+
+public class TestLeaseManager {
+
+    private static final long DUMMY_EPOCH_1 = 1L;
+    private static final long DUMMY_EPOCH_2 = 2L;
+    private static final long DUMMY_EPOCH_3 = 3L;
+    private static final long DUMMY_LOW_WATERMARK_1 = DUMMY_EPOCH_1;
+    private static final long DUMMY_LOW_WATERMARK_2 = DUMMY_EPOCH_2;
+    private static final long DUMMY_LOW_WATERMARK_3 = DUMMY_EPOCH_3;
+
+    private static final String LEASE_MGR_ID_1 = "LM1";
+    private static final String LEASE_MGR_ID_2 = "LM2";
+    private static final String INSTANCE_ID_1 = "LM1" + "#";
+    private static final String INSTANCE_ID_2 = "LM2" + "#";
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestLeaseManager.class);
+
+    private static final long TEST_LEASE_PERIOD_IN_MS = 1000; // 1 second
+
+    private CuratorFramework zkClient;
+    private TestingServer zkServer;
+
+    @Mock
+    private Panicker panicker;
+
+    private PausableLeaseManager leaseManager1;
+    private PausableLeaseManager leaseManager2;
+
+    @BeforeClass
+    public void beforeClass() throws Exception {
+
+        LOG.info("Starting ZK Server");
+        zkServer = TestUtils.provideTestingZKServer();
+        LOG.info("ZK Server Started @ {}", zkServer.getConnectString());
+
+        zkClient = TestUtils.provideConnectedZKClient(DEFAULT_ZK_CLUSTER);
+
+    }
+
+    @AfterClass
+    public void afterClass() throws Exception {
+
+        zkClient.close();
+
+        CloseableUtils.closeQuietly(zkServer);
+        zkServer = null;
+        LOG.info("ZK Server Stopped");
+
+    }
+
+    @Test(timeOut = 30_000)
+    public void testErrorInitializingTSOStateExitsTheTSO() throws Exception {
+
+        final String TEST_TSO_LEASE_PATH = "/test0_tsolease";
+        final String TEST_CURRENT_TSO_PATH = "/test0_currenttso";
+
+        Panicker panicker = spy(new MockPanicker());
+
+        TSOChannelHandler tsoChannelHandler = mock(TSOChannelHandler.class);
+        TSOStateManager stateManager = mock(TSOStateManager.class);
+        when(stateManager.reset()).thenThrow(new IOException());
+        leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
+                                                 tsoChannelHandler,
+                                                 stateManager,
+                                                 TEST_LEASE_PERIOD_IN_MS,
+                                                 TEST_TSO_LEASE_PATH,
+                                                 TEST_CURRENT_TSO_PATH,
+                                                 zkClient,
+                                                 panicker);
+        leaseManager1.startService();
+
+        // ... let the test run for some time...
+        Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+        verify(panicker, timeout(2000).atLeastOnce()).panic(anyString(), any(IOException.class));
+
+        leaseManager1.stopService();
+
+    }
+
+    @Test(timeOut = 60000)
+    public void testLeaseHolderDoesNotChangeWhenPausedForALongTimeAndTheresNoOtherInstance()
+            throws Exception {
+
+        final String TEST_TSO_LEASE_PATH = "/test1_tsolease";
+        final String TEST_CURRENT_TSO_PATH = "/test1_currenttso";
+
+        // Launch the instance under test...
+        TSOChannelHandler tsoChannelHandler1 = mock(TSOChannelHandler.class);
+        TSOStateManager stateManager1 = mock(TSOStateManager.class);
+        when(stateManager1.reset()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
+        leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
+                                                 tsoChannelHandler1,
+                                                 stateManager1,
+                                                 TEST_LEASE_PERIOD_IN_MS,
+                                                 TEST_TSO_LEASE_PATH,
+                                                 TEST_CURRENT_TSO_PATH,
+                                                 zkClient,
+                                                 panicker);
+        leaseManager1.startService();
+
+        // ... let the test run for some time...
+        Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+        // ... check is the lease holder
+        checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
+        checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
+        assertTrue(leaseManager1.stillInLeasePeriod());
+
+        // Then, pause instance when trying to renew lease...
+        leaseManager1.pausedInTryToRenewLeasePeriod();
+
+        // ...let the test run for some time...
+        Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+        // ...check that nothing changed...
+        checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
+        checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
+
+        // Finally, resume the instance...
+        leaseManager1.resume();
+
+        // ... let the test run for some time...
+        Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+        // ... and check again that nothing changed
+        checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
+        checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
+        assertTrue(leaseManager1.stillInLeasePeriod());
+
+    }
+
+    @Test(timeOut = 60_000)
+    public void testLeaseHolderDoesNotChangeWhenANewLeaseManagerIsUp() throws Exception {
+
+        final String TEST_TSO_LEASE_PATH = "/test2_tsolease";
+        final String TEST_CURRENT_TSO_PATH = "/test2_currenttso";
+
+        // Launch the master instance...
+        TSOChannelHandler tsoChannelHandler1 = mock(TSOChannelHandler.class);
+        TSOStateManager stateManager1 = mock(TSOStateManager.class);
+        when(stateManager1.reset()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
+        leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
+                                                 tsoChannelHandler1,
+                                                 stateManager1,
+                                                 TEST_LEASE_PERIOD_IN_MS,
+                                                 TEST_TSO_LEASE_PATH,
+                                                 TEST_CURRENT_TSO_PATH,
+                                                 zkClient,
+                                                 panicker);
+
+        leaseManager1.startService();
+
+        // ...let the test run for some time...
+        Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+        // ...so it should be the current holder of the lease
+        checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
+        checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
+        assertTrue(leaseManager1.stillInLeasePeriod());
+
+        // Then launch another instance...
+        TSOChannelHandler tsoChannelHandler2 = mock(TSOChannelHandler.class);
+        TSOStateManager stateManager2 = mock(TSOStateManager.class);
+        when(stateManager2.reset()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_2, DUMMY_EPOCH_2));
+        leaseManager2 = new PausableLeaseManager(LEASE_MGR_ID_2,
+                                                 tsoChannelHandler2,
+                                                 stateManager2,
+                                                 TEST_LEASE_PERIOD_IN_MS,
+                                                 TEST_TSO_LEASE_PATH,
+                                                 TEST_CURRENT_TSO_PATH,
+                                                 zkClient,
+                                                 panicker);
+        leaseManager2.startService();
+
+        // ... let the test run for some time...
+        Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+        // ... and after the period, the first instance should be still the holder
+        checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
+        checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
+        assertTrue(leaseManager1.stillInLeasePeriod());
+        assertFalse(leaseManager2.stillInLeasePeriod());
+    }
+
+    @Test(timeOut = 60_000)
+    public void testLeaseHolderChangesWhenActiveLeaseManagerIsPaused() throws Exception {
+
+        final String TEST_TSO_LEASE_PATH = "/test3_tsolease";
+        final String TEST_CURRENT_TSO_PATH = "/test3_currenttso";
+
+        // Launch the master instance...
+        TSOChannelHandler tsoChannelHandler1 = mock(TSOChannelHandler.class);
+        TSOStateManager stateManager1 = mock(TSOStateManager.class);
+        when(stateManager1.reset()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
+        leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
+                                                 tsoChannelHandler1,
+                                                 stateManager1,
+                                                 TEST_LEASE_PERIOD_IN_MS,
+                                                 TEST_TSO_LEASE_PATH,
+                                                 TEST_CURRENT_TSO_PATH,
+                                                 zkClient,
+                                                 panicker);
+
+        leaseManager1.startService();
+
+        // ... let the test run for some time...
+        Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+        // ... so it should be the current holder of the lease
+        checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
+        checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
+        assertTrue(leaseManager1.stillInLeasePeriod());
+
+        // Then launch another instance...
+        TSOChannelHandler tsoChannelHandler2 = mock(TSOChannelHandler.class);
+        TSOStateManager stateManager2 = mock(TSOStateManager.class);
+        when(stateManager2.reset()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_2, DUMMY_EPOCH_2));
+        leaseManager2 = new PausableLeaseManager(LEASE_MGR_ID_2,
+                                                 tsoChannelHandler2,
+                                                 stateManager2,
+                                                 TEST_LEASE_PERIOD_IN_MS,
+                                                 TEST_TSO_LEASE_PATH,
+                                                 TEST_CURRENT_TSO_PATH,
+                                                 zkClient,
+                                                 panicker);
+        leaseManager2.startService();
+
+        // ... and pause active lease manager...
+        leaseManager1.pausedInStillInLeasePeriod();
+
+        // ... and let the test run for some time...
+        Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+        // ... and check that lease owner should have changed to the second instance
+        checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_2);
+        checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_2 + "2");
+        assertTrue(leaseManager2.stillInLeasePeriod());
+
+        // Now, lets resume the first instance...
+        when(stateManager1.reset()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_3, DUMMY_EPOCH_3));
+        leaseManager1.resume();
+
+        // ... let the test run for some time...
+        Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+        // and check the lease owner is still the second instance (preserves the lease)
+        checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_2);
+        checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_2 + "2");
+        assertFalse(leaseManager1.stillInLeasePeriod());
+        assertTrue(leaseManager2.stillInLeasePeriod());
+
+        // Finally, pause active lease manager when trying to renew lease...
+        leaseManager2.pausedInTryToRenewLeasePeriod();
+
+        // ... let the test run for some time...
+        Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+        // ... and check lease owner is has changed again to the first instance
+        checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
+        checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "3");
+        assertFalse(leaseManager2.stillInLeasePeriod());
+        assertTrue(leaseManager1.stillInLeasePeriod());
+
+        // Resume the second instance...
+        leaseManager2.resume();
+
+        // ... let the test run for some time...
+        Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+        // ... but the lease owner should still be the first instance
+        checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
+        checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "3");
+        assertFalse(leaseManager2.stillInLeasePeriod());
+        assertTrue(leaseManager1.stillInLeasePeriod());
+
+    }
+
+
+    @Test(timeOut = 40_000)
+    public void testLeaseManagerPanicsWhenUnexpectedInfoIsFoundInCurrentTSOZnode() throws Exception {
+
+        final String TEST_TSO_LEASE_PATH = "/test_wronginfo_tsolease";
+        final String TEST_CURRENT_TSO_PATH = "/test_wronginfo_currenttso";
+
+        Panicker panicker = spy(new MockPanicker());
+
+        // Launch the master instance...
+        TSOStateManager stateManager1 = mock(TSOStateManager.class);
+        when(stateManager1.reset()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
+        PausableLeaseManager leaseManager = new PausableLeaseManager(LEASE_MGR_ID_1,
+                                                                     mock(TSOChannelHandler.class),
+                                                                     stateManager1,
+                                                                     TEST_LEASE_PERIOD_IN_MS,
+                                                                     TEST_TSO_LEASE_PATH,
+                                                                     TEST_CURRENT_TSO_PATH,
+                                                                     zkClient,
+                                                                     panicker);
+
+        leaseManager.startService();
+        // ...and let the test run for some time...
+        Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+        leaseManager.pausedInTryToRenewLeasePeriod();
+
+        // 1st Panic test) Inject corrupted data in the ZNode, force reelection and test the panicker is exercised
+        zkClient.setData().forPath(TEST_CURRENT_TSO_PATH, "CorruptedData!!!".getBytes());
+
+        // ...and let the test run for some time...
+        Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+        leaseManager.resume();
+        // ...and let the test run for some time...
+        Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+        ArgumentCaptor<IllegalArgumentException> trowableIAE = ArgumentCaptor.forClass(IllegalArgumentException.class);
+        verify(panicker).panic(anyString(), trowableIAE.capture());
+        assertTrue(trowableIAE.getValue() instanceof IllegalArgumentException);
+        assertTrue(trowableIAE.getValue().getMessage().contains("Incorrect TSO Info found"));
+
+        // 2nd Panic test) Simulate that a new master appeared in the meantime, force reelection
+        // and test the panicker is exercised
+        reset(panicker);
+        zkClient.setData().forPath(TEST_CURRENT_TSO_PATH, "newTSO:12345#10000".getBytes());
+
+        leaseManager.pausedInTryToRenewLeasePeriod();
+
+        // ...and let the test run for some time...
+        Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+        leaseManager.resume();
+        // ...and let the test run for some time...
+        Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+        ArgumentCaptor<LeaseManagement.LeaseManagementException> trowableLME =
+                ArgumentCaptor.forClass(LeaseManagement.LeaseManagementException.class);
+        verify(panicker).panic(anyString(), trowableLME.capture());
+        assertTrue(trowableLME.getValue() instanceof LeaseManagement.LeaseManagementException);
+        assertTrue(trowableLME.getValue().getMessage().contains("Another TSO replica was found"));
+    }
+
+    @Test(timeOut = 1000)
+    public void testNonHALeaseManager() throws Exception {
+
+        // Launch the instance...
+        VoidLeaseManager leaseManager = new VoidLeaseManager(mock(TSOChannelHandler.class),
+                                                             mock(TSOStateManager.class));
+
+        leaseManager.startService();
+        assertTrue(leaseManager.stillInLeasePeriod());
+        leaseManager.stopService();
+
+    }
+
+    // ----------------------------------------------------------------------------------------------------------------
+    // Checkers
+    // ----------------------------------------------------------------------------------------------------------------
+
+    private void checkLeaseHolder(String tsoLeasePath, String expectedLeaseHolder) throws Exception {
+        byte[] leaseHolderInBytes = zkClient.getData().forPath(tsoLeasePath);
+        String leaseHolder = new String(leaseHolderInBytes, Charsets.UTF_8);
+
+        assertEquals(leaseHolder, expectedLeaseHolder);
+    }
+
+    private void checkInstanceId(String currentTSOPath, String expectedInstanceId) throws Exception {
+        byte[] expectedInstanceIdInBytes = zkClient.getData().forPath(currentTSOPath);
+        String instanceId = new String(expectedInstanceIdInBytes, Charsets.UTF_8);
+
+        assertEquals(instanceId, expectedInstanceId);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/TestLongCache.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestLongCache.java b/tso-server/src/test/java/org/apache/omid/tso/TestLongCache.java
new file mode 100644
index 0000000..6a88686
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestLongCache.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+
+public class TestLongCache {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestLongCache.class);
+
+    private static final long TEST_VALUE = 1000;
+
+    private Random random = new Random(System.currentTimeMillis());
+
+    @Test
+    public void testAddAndGetElemsAndResetCache() {
+
+        // Cache configuration
+        final int CACHE_SIZE = 10_000_000;
+        final int CACHE_ASSOCIATIVITY = 32;
+        Cache cache = new LongCache(CACHE_SIZE, CACHE_ASSOCIATIVITY);
+
+        // After creation, cache values should be the default
+        for (int i = 0; i < 1000; i++) {
+            long position = random.nextLong();
+            assertEquals(cache.get(position), LongCache.RESET_VALUE);
+        }
+
+        Set<Long> testedKeys = new TreeSet<>();
+        // Populate some of the values
+        for (int i = 0; i < 1000; i++) {
+            long position = random.nextLong();
+            cache.set(position, TEST_VALUE);
+            testedKeys.add(position);
+        }
+
+        // Get the values and check them
+        for (long key : testedKeys) {
+            assertEquals(cache.get(key), TEST_VALUE);
+        }
+
+        // Reset cache and check the values are the default again
+        long startTimeInMs = System.currentTimeMillis();
+        cache.reset();
+        long endTimeInMs = System.currentTimeMillis();
+        long resetTimeInMs = endTimeInMs - startTimeInMs;
+        LOG.info("Time in reseting cache of {}/{} elems/asoc {}ms", CACHE_SIZE, CACHE_ASSOCIATIVITY, resetTimeInMs);
+
+        for (long key : testedKeys) {
+            assertEquals(cache.get(key), LongCache.RESET_VALUE);
+        }
+
+    }
+
+    @Test(timeOut = 10000)
+    public void testEntriesAge() {
+
+        final int entries = 1000;
+
+        Cache cache = new LongCache(entries, 16);
+
+        int removals = 0;
+        long totalAge = 0;
+        double tempStdDev = 0;
+        double tempAvg = 0;
+
+        int i = 0;
+        int largestDeletedTimestamp = 0;
+        for (; i < entries * 10; ++i) {
+            long removed = cache.set(random.nextLong(), i);
+            if (removed > largestDeletedTimestamp) {
+                largestDeletedTimestamp = (int) removed;
+            }
+        }
+
+        long time = System.nanoTime();
+        for (; i < entries * 100; ++i) {
+            long removed = cache.set(random.nextLong(), i);
+            if (removed > largestDeletedTimestamp) {
+                largestDeletedTimestamp = (int) removed;
+            }
+            int gap = i - ((int) largestDeletedTimestamp);
+            removals++;
+            totalAge += gap;
+            double oldAvg = tempAvg;
+            tempAvg += (gap - tempAvg) / removals;
+            tempStdDev += (gap - oldAvg) * (gap - tempAvg);
+        }
+        long elapsed = System.nanoTime() - time;
+        LOG.info("Elapsed (ms): " + (elapsed / (double) 1000));
+
+        double avgGap = totalAge / (double) removals;
+        LOG.info("Avg gap: " + (tempAvg));
+        LOG.info("Std dev gap: " + Math.sqrt((tempStdDev / entries)));
+        assertTrue("avgGap should be greater than entries * 0.6",
+                avgGap > entries * 0.6);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
new file mode 100644
index 0000000..816194d
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.timestamp.storage.TimestampStorage;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+@SuppressWarnings({"UnusedDeclaration"})
+public class TestPanicker {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestPanicker.class);
+
+    @Mock
+    private CommitTable.Writer mockWriter;
+    @Mock
+    private MetricsRegistry metrics;
+
+    @BeforeMethod
+    public void initMocksAndComponents() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @AfterMethod
+    void afterMethod() {
+        Mockito.reset(mockWriter);
+    }
+
+    // Note this test has been moved and refactored to TestTimestampOracle because
+    // it tests the behaviour of the TimestampOracle.
+    // Please, remove me in a future commit
+    @Test
+    public void testTimestampOraclePanic() throws Exception {
+        TimestampStorage storage = spy(new TimestampOracleImpl.InMemoryTimestampStorage());
+        Panicker panicker = spy(new MockPanicker());
+
+        doThrow(new RuntimeException("Out of memory")).when(storage).updateMaxTimestamp(anyLong(), anyLong());
+
+        final TimestampOracleImpl tso = new TimestampOracleImpl(metrics, storage, panicker);
+        tso.initialize();
+        Thread allocThread = new Thread("AllocThread") {
+            @Override
+            public void run() {
+                try {
+                    while (true) {
+                        tso.next();
+                    }
+                } catch (IOException ioe) {
+                    LOG.error("Shouldn't occur");
+                }
+            }
+        };
+        allocThread.start();
+
+        verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
+    }
+
+    // Note this test has been moved and refactored to TestPersistenceProcessor because
+    // it tests the behaviour of the PersistenceProcessor.
+    // Please, remove me in a future commit
+    @Test
+    public void testCommitTablePanic() throws Exception {
+        Panicker panicker = spy(new MockPanicker());
+
+        doThrow(new IOException("Unable to write@TestPanicker")).when(mockWriter).flush();
+
+        final CommitTable.Client mockClient = mock(CommitTable.Client.class);
+        CommitTable commitTable = new CommitTable() {
+            @Override
+            public Writer getWriter() {
+                return mockWriter;
+            }
+
+            @Override
+            public Client getClient() {
+                return mockClient;
+            }
+        };
+
+        LeaseManager leaseManager = mock(LeaseManager.class);
+        doReturn(true).when(leaseManager).stillInLeasePeriod();
+        PersistenceProcessor proc = new PersistenceProcessorImpl(new TSOServerConfig(),
+                                                                 metrics,
+                                                                 "localhost:1234",
+                                                                 leaseManager,
+                                                                 commitTable,
+                                                                 mock(ReplyProcessor.class),
+                                                                 mock(RetryProcessor.class),
+                                                                 panicker);
+        proc.persistCommit(1, 2, null, new MonitoringContext(metrics));
+        verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
+    }
+
+    // Note this test has been moved and refactored to TestPersistenceProcessor because
+    // it tests the behaviour of the PersistenceProcessor.
+    // Please, remove me in a future commit
+    @Test
+    public void testRuntimeExceptionTakesDownDaemon() throws Exception {
+        Panicker panicker = spy(new MockPanicker());
+
+        final CommitTable.Writer mockWriter = mock(CommitTable.Writer.class);
+        doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong());
+
+        final CommitTable.Client mockClient = mock(CommitTable.Client.class);
+        CommitTable commitTable = new CommitTable() {
+            @Override
+            public Writer getWriter() {
+                return mockWriter;
+            }
+
+            @Override
+            public Client getClient() {
+                return mockClient;
+            }
+        };
+        PersistenceProcessor proc = new PersistenceProcessorImpl(new TSOServerConfig(),
+                                                                 metrics,
+                                                                 "localhost:1234",
+                                                                 mock(LeaseManager.class),
+                                                                 commitTable,
+                                                                 mock(ReplyProcessor.class),
+                                                                 mock(RetryProcessor.class),
+                                                                 panicker);
+        proc.persistCommit(1, 2, null, new MonitoringContext(metrics));
+        verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
new file mode 100644
index 0000000..eb93277
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.metrics.NullMetricsProvider;
+import org.apache.omid.tso.PersistenceProcessorImpl.Batch;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+@SuppressWarnings({"UnusedDeclaration"})
+public class TestPersistenceProcessor {
+
+    @Mock
+    private Batch batch;
+    @Mock
+    private CommitTable.Writer mockWriter;
+    @Mock
+    private CommitTable.Client mockClient;
+    @Mock
+    private RetryProcessor retryProcessor;
+    @Mock
+    private ReplyProcessor replyProcessor;
+    @Mock
+    private Panicker panicker;
+
+    private MetricsRegistry metrics;
+    private CommitTable commitTable;
+
+    @BeforeMethod(alwaysRun = true, timeOut = 30_000)
+    public void initMocksAndComponents() throws Exception {
+
+        MockitoAnnotations.initMocks(this);
+
+        // Configure mock writer to flush successfully
+        doThrow(new IOException("Unable to write")).when(mockWriter).flush();
+
+        // Configure null metrics provider
+        metrics = new NullMetricsProvider();
+
+        // Configure commit table to return the mocked writer and client
+        commitTable = new CommitTable() {
+            @Override
+            public Writer getWriter() {
+                return mockWriter;
+            }
+
+            @Override
+            public Client getClient() {
+                return mockClient;
+            }
+        };
+    }
+
+    @AfterMethod
+    void afterMethod() {
+        Mockito.reset(mockWriter);
+    }
+
+    @Test
+    public void testCommitPersistenceWithNonHALeaseManager() throws Exception {
+
+        // Init a non-HA lease manager
+        VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
+                                                                 mock(TSOStateManager.class)));
+
+        TSOServerConfig tsoServerConfig = new TSOServerConfig();
+        tsoServerConfig.setBatchPersistTimeoutInMs(100);
+        // Component under test
+        PersistenceProcessor proc = new PersistenceProcessorImpl(tsoServerConfig,
+                                                                 metrics,
+                                                                 "localhost:1234",
+                                                                 batch,
+                                                                 leaseManager,
+                                                                 commitTable,
+                                                                 replyProcessor,
+                                                                 retryProcessor,
+                                                                 panicker);
+
+        // The non-ha lease manager always return true for
+        // stillInLeasePeriod(), so verify the batch sends replies as master
+        MonitoringContext monCtx = new MonitoringContext(metrics);
+        proc.persistCommit(1, 2, null, monCtx);
+        verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
+        verify(batch, timeout(1000).times(2)).sendRepliesAndReset(any(ReplyProcessor.class),
+                                                                  any(RetryProcessor.class),
+                                                                  eq(true));
+    }
+
+    @Test
+    public void testCommitPersistenceWithHALeaseManager() throws Exception {
+
+        // Init a HA lease manager
+        LeaseManager leaseManager = mock(LeaseManager.class);
+
+        TSOServerConfig tsoServerConfig = new TSOServerConfig();
+        tsoServerConfig.setBatchPersistTimeoutInMs(100);
+        // Component under test
+        PersistenceProcessor proc = new PersistenceProcessorImpl(tsoServerConfig,
+                                                                 metrics,
+                                                                 "localhost:1234",
+                                                                 batch,
+                                                                 leaseManager,
+                                                                 commitTable,
+                                                                 replyProcessor,
+                                                                 retryProcessor,
+                                                                 panicker);
+
+        // Configure the lease manager to always return true for
+        // stillInLeasePeriod, so verify the batch sends replies as master
+        doReturn(true).when(leaseManager).stillInLeasePeriod();
+        MonitoringContext monCtx = new MonitoringContext(metrics);
+        proc.persistCommit(1, 2, null, monCtx);
+        verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
+        verify(batch).sendRepliesAndReset(any(ReplyProcessor.class), any(RetryProcessor.class), eq(true));
+
+        // Configure the lease manager to always return true first and false
+        // later for stillInLeasePeriod, so verify the batch sends replies as
+        // non-master
+        reset(leaseManager);
+        reset(batch);
+        doReturn(true).doReturn(false).when(leaseManager).stillInLeasePeriod();
+        proc.persistCommit(1, 2, null, monCtx);
+        verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
+        verify(batch).sendRepliesAndReset(any(ReplyProcessor.class), any(RetryProcessor.class), eq(false));
+
+        // Configure the lease manager to always return false for
+        // stillInLeasePeriod, so verify the batch sends replies as non-master
+        reset(leaseManager);
+        reset(batch);
+        doReturn(false).when(leaseManager).stillInLeasePeriod();
+        proc.persistCommit(1, 2, null, monCtx);
+        verify(leaseManager, timeout(1000).times(1)).stillInLeasePeriod();
+        verify(batch).sendRepliesAndReset(any(ReplyProcessor.class), any(RetryProcessor.class), eq(false));
+
+    }
+
+    @Test
+    public void testCommitTableExceptionOnCommitPersistenceTakesDownDaemon() throws Exception {
+
+        // Init lease management (doesn't matter if HA or not)
+        LeaseManagement leaseManager = mock(LeaseManagement.class);
+        PersistenceProcessor proc = new PersistenceProcessorImpl(new TSOServerConfig(),
+                                                                 metrics,
+                                                                 "localhost:1234",
+                                                                 leaseManager,
+                                                                 commitTable,
+                                                                 mock(ReplyProcessor.class),
+                                                                 mock(RetryProcessor.class),
+                                                                 panicker);
+        MonitoringContext monCtx = new MonitoringContext(metrics);
+
+        // Configure lease manager to work normally
+        doReturn(true).when(leaseManager).stillInLeasePeriod();
+
+        // Configure commit table writer to explode when flushing changes to DB
+        doThrow(new IOException("Unable to write@TestPersistenceProcessor2")).when(mockWriter).flush();
+
+        // Check the panic is extended!
+        proc.persistCommit(1, 2, null, monCtx);
+        verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
+
+    }
+
+    @Test
+    public void testRuntimeExceptionOnCommitPersistenceTakesDownDaemon() throws Exception {
+
+        PersistenceProcessor proc = new PersistenceProcessorImpl(new TSOServerConfig(),
+                                                                 metrics,
+                                                                 "localhost:1234",
+                                                                 mock(LeaseManagement.class),
+                                                                 commitTable,
+                                                                 replyProcessor,
+                                                                 retryProcessor,
+                                                                 panicker);
+
+        // Configure writer to explode with a runtime exception
+        doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong());
+        MonitoringContext monCtx = new MonitoringContext(metrics);
+
+        // Check the panic is extended!
+        proc.persistCommit(1, 2, null, monCtx);
+        verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
new file mode 100644
index 0000000..c128beb
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.google.common.collect.Lists;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.metrics.NullMetricsProvider;
+import org.jboss.netty.channel.Channel;
+import org.mockito.ArgumentCaptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.testng.AssertJUnit.assertTrue;
+
+public class TestRequestProcessor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestRequestProcessor.class);
+
+    private static final int CONFLICT_MAP_SIZE = 1000;
+    private static final int CONFLICT_MAP_ASSOCIATIVITY = 32;
+
+    private MetricsRegistry metrics = new NullMetricsProvider();
+
+    private PersistenceProcessor persist;
+
+    private TSOStateManager stateManager;
+
+    // Request processor under test
+    private RequestProcessor requestProc;
+
+    @BeforeMethod
+    public void beforeMethod() throws Exception {
+
+        // Build the required scaffolding for the test
+        MetricsRegistry metrics = new NullMetricsProvider();
+
+        TimestampOracleImpl timestampOracle =
+                new TimestampOracleImpl(metrics, new TimestampOracleImpl.InMemoryTimestampStorage(), new MockPanicker());
+
+        stateManager = new TSOStateManagerImpl(timestampOracle);
+
+        persist = mock(PersistenceProcessor.class);
+
+        TSOServerConfig config = new TSOServerConfig();
+        config.setMaxItems(CONFLICT_MAP_SIZE);
+
+        requestProc = new RequestProcessorImpl(config, metrics, timestampOracle, persist, new MockPanicker());
+
+        // Initialize the state for the experiment
+        stateManager.register(requestProc);
+        stateManager.reset();
+
+    }
+
+    @Test(timeOut = 30_000)
+    public void testTimestamp() throws Exception {
+
+        requestProc.timestampRequest(null, new MonitoringContext(metrics));
+        ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
+        verify(persist, timeout(100).times(1)).persistTimestamp(
+                firstTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+
+        long firstTS = firstTScapture.getValue();
+        // verify that timestamps increase monotonically
+        for (int i = 0; i < 100; i++) {
+            requestProc.timestampRequest(null, new MonitoringContext(metrics));
+            verify(persist, timeout(100).times(1)).persistTimestamp(eq(firstTS++), any(Channel.class), any(MonitoringContext.class));
+        }
+
+    }
+
+    @Test(timeOut = 30_000)
+    public void testCommit() throws Exception {
+
+        requestProc.timestampRequest(null, new MonitoringContext(metrics));
+        ArgumentCaptor<Long> TScapture = ArgumentCaptor.forClass(Long.class);
+        verify(persist, timeout(100).times(1)).persistTimestamp(
+                TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+        long firstTS = TScapture.getValue();
+
+        List<Long> writeSet = Lists.newArrayList(1L, 20L, 203L);
+        requestProc.commitRequest(firstTS - 1, writeSet, false, null, new MonitoringContext(metrics));
+        verify(persist, timeout(100).times(1)).persistAbort(eq(firstTS - 1), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+
+        requestProc.commitRequest(firstTS, writeSet, false, null, new MonitoringContext(metrics));
+        ArgumentCaptor<Long> commitTScapture = ArgumentCaptor.forClass(Long.class);
+
+        verify(persist, timeout(100).times(1)).persistCommit(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+        assertTrue("Commit TS must be greater than start TS", commitTScapture.getValue() > firstTS);
+
+        // test conflict
+        requestProc.timestampRequest(null, new MonitoringContext(metrics));
+        TScapture = ArgumentCaptor.forClass(Long.class);
+        verify(persist, timeout(100).times(2)).persistTimestamp(
+                TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+        long secondTS = TScapture.getValue();
+
+        requestProc.timestampRequest(null, new MonitoringContext(metrics));
+        TScapture = ArgumentCaptor.forClass(Long.class);
+        verify(persist, timeout(100).times(3)).persistTimestamp(
+                TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+        long thirdTS = TScapture.getValue();
+
+        requestProc.commitRequest(thirdTS, writeSet, false, null, new MonitoringContext(metrics));
+        verify(persist, timeout(100).times(1)).persistCommit(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContext.class));
+        requestProc.commitRequest(secondTS, writeSet, false, null, new MonitoringContext(metrics));
+        verify(persist, timeout(100).times(1)).persistAbort(eq(secondTS), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+
+    }
+
+    @Test(timeOut = 30_000)
+    public void testCommitRequestAbortsWhenResettingRequestProcessorState() throws Exception {
+
+        List<Long> writeSet = Collections.emptyList();
+
+        // Start a transaction...
+        requestProc.timestampRequest(null, new MonitoringContext(metrics));
+        ArgumentCaptor<Long> capturedTS = ArgumentCaptor.forClass(Long.class);
+        verify(persist, timeout(100).times(1)).persistTimestamp(capturedTS.capture(),
+                                                                any(Channel.class),
+                                                                any(MonitoringContext.class));
+        long startTS = capturedTS.getValue();
+
+        // ... simulate the reset of the RequestProcessor state (e.g. due to
+        // a change in mastership) and...
+        stateManager.reset();
+
+        // ...check that the transaction is aborted when trying to commit
+        requestProc.commitRequest(startTS, writeSet, false, null, new MonitoringContext(metrics));
+        verify(persist, timeout(100).times(1)).persistAbort(eq(startTS), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+
+    }
+
+    @Test(timeOut = 5_000)
+    public void testLowWatermarkIsStoredOnlyWhenACacheElementIsEvicted() throws Exception {
+
+        final int ANY_START_TS = 1;
+        final long FIRST_COMMIT_TS_EVICTED = 1L;
+        final long NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED = 2L;
+
+        // Fill the cache to provoke a cache eviction
+        for (long i = 0; i < CONFLICT_MAP_SIZE + CONFLICT_MAP_ASSOCIATIVITY; i++) {
+            long writeSetElementHash = i + 1; // This is to match the assigned CT: K/V in cache = WS Element Hash/CT
+            List<Long> writeSet = Lists.newArrayList(writeSetElementHash);
+            requestProc.commitRequest(ANY_START_TS, writeSet, false, null, new MonitoringContext(metrics));
+        }
+
+        Thread.currentThread().sleep(3000); // Allow the Request processor to finish the request processing
+
+        // Check that first time its called is on init
+        verify(persist, timeout(100).times(1)).persistLowWatermark(0L);
+        // Then, check it is called when cache is full and the first element is evicted (should be a 1)
+        verify(persist, timeout(100).times(1)).persistLowWatermark(FIRST_COMMIT_TS_EVICTED);
+        // Finally it should never be called with the next element
+        verify(persist, timeout(100).never()).persistLowWatermark(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED);
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
new file mode 100644
index 0000000..9ba4f9d
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.google.common.base.Optional;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.committable.CommitTable.CommitTimestamp;
+import org.apache.omid.committable.InMemoryCommitTable;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.metrics.NullMetricsProvider;
+import org.jboss.netty.channel.Channel;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.testng.AssertJUnit.assertEquals;
+
+public class TestRetryProcessor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestRetryProcessor.class);
+
+    private MetricsRegistry metrics = new NullMetricsProvider();
+
+    private static long NON_EXISTING_ST_TX = 1000;
+    private static long ST_TX_1 = 0;
+    private static long CT_TX_1 = 1;
+    private static long ST_TX_2 = 2;
+
+    @Mock
+    private Channel channel;
+    @Mock
+    private ReplyProcessor replyProc;
+    @Mock
+    private Panicker panicker;
+
+    private CommitTable commitTable;
+
+    @BeforeMethod(alwaysRun = true, timeOut = 30_000)
+    public void initMocksAndComponents() {
+        MockitoAnnotations.initMocks(this);
+        // Init components
+        commitTable = new InMemoryCommitTable();
+        metrics = new NullMetricsProvider();
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testBasicFunctionality() throws Exception {
+
+        // The element to test
+        RetryProcessor retryProc = new RetryProcessorImpl(metrics, commitTable, replyProc, panicker);
+
+        // Test we'll reply with an abort for a retry request when the start timestamp IS NOT in the commit table
+        retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, new MonitoringContext(metrics));
+        ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
+        verify(replyProc, timeout(100).times(1)).abortResponse(firstTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+
+        long startTS = firstTScapture.getValue();
+        assertEquals("Captured timestamp should be the same as NON_EXISTING_ST_TX", NON_EXISTING_ST_TX, startTS);
+
+        // Test we'll reply with a commit for a retry request when the start timestamp IS in the commit table
+        commitTable.getWriter().addCommittedTransaction(ST_TX_1, CT_TX_1); // Add a tx to commit table
+
+        retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
+        ArgumentCaptor<Long> secondTScapture = ArgumentCaptor.forClass(Long.class);
+        verify(replyProc, timeout(100).times(1))
+                .commitResponse(eq(false), firstTScapture.capture(), secondTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+
+        startTS = firstTScapture.getValue();
+        long commitTS = secondTScapture.getValue();
+        assertEquals("Captured timestamp should be the same as ST_TX_1", ST_TX_1, startTS);
+        assertEquals("Captured timestamp should be the same as CT_TX_1", CT_TX_1, commitTS);
+    }
+
+    @Test(timeOut = 10_000)
+    public void testRetriedRequestForInvalidatedTransactionReturnsAnAbort() throws Exception {
+
+        // Invalidate the transaction
+        commitTable.getClient().tryInvalidateTransaction(ST_TX_1);
+
+        // Pre-start verification: Validate that the transaction is invalidated
+        // NOTE: This test should be in the a test class for InMemoryCommitTable
+        Optional<CommitTimestamp> invalidTxMarker = commitTable.getClient().getCommitTimestamp(ST_TX_1).get();
+        Assert.assertTrue(invalidTxMarker.isPresent());
+        Assert.assertEquals(invalidTxMarker.get().getValue(), InMemoryCommitTable.INVALID_TRANSACTION_MARKER);
+
+        // The element to test
+        RetryProcessor retryProc = new RetryProcessorImpl(metrics, commitTable, replyProc, panicker);
+
+        // Test we'll reply with an abort for a retry request when the
+        // transaction id IS in the commit table BUT invalidated
+        retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
+        ArgumentCaptor<Long> startTScapture = ArgumentCaptor.forClass(Long.class);
+        verify(replyProc, timeout(100).times(1)).abortResponse(startTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+
+        long startTS = startTScapture.getValue();
+        Assert.assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
+
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
new file mode 100644
index 0000000..0f64181
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.omid.metrics.NullMetricsProvider;
+import org.apache.omid.proto.TSOProto;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
+import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyCollectionOf;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+@SuppressWarnings({"UnusedDeclaration", "StatementWithEmptyBody"})
+public class TestTSOChannelHandlerNetty {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestTSOChannelHandlerNetty.class);
+
+    @Mock
+    private
+    RequestProcessor requestProcessor;
+
+    // Component under test
+    private TSOChannelHandler channelHandler;
+
+    @BeforeMethod
+    public void beforeTestMethod() {
+        MockitoAnnotations.initMocks(this);
+        TSOServerConfig config = new TSOServerConfig();
+        config.setPort(1434);
+        channelHandler = new TSOChannelHandler(config, requestProcessor, new NullMetricsProvider());
+    }
+
+    @AfterMethod
+    public void afterTestMethod() throws IOException {
+        channelHandler.close();
+    }
+
+    @Test(timeOut = 10_000)
+    public void testMainAPI() throws Exception {
+
+        // Check initial state
+        assertNull(channelHandler.listeningChannel);
+        assertNull(channelHandler.channelGroup);
+
+        // Check initial connection
+        channelHandler.reconnect();
+        assertTrue(channelHandler.listeningChannel.isOpen());
+        assertEquals(channelHandler.channelGroup.size(), 1);
+        assertEquals(((InetSocketAddress) channelHandler.listeningChannel.getLocalAddress()).getPort(), 1434);
+
+        // Check connection close
+        channelHandler.closeConnection();
+        assertFalse(channelHandler.listeningChannel.isOpen());
+        assertEquals(channelHandler.channelGroup.size(), 0);
+
+        // Check re-closing connection
+        channelHandler.closeConnection();
+        assertFalse(channelHandler.listeningChannel.isOpen());
+        assertEquals(channelHandler.channelGroup.size(), 0);
+
+        // Check connection after closing
+        channelHandler.reconnect();
+        assertTrue(channelHandler.listeningChannel.isOpen());
+        assertEquals(channelHandler.channelGroup.size(), 1);
+
+        // Check re-connection
+        channelHandler.reconnect();
+        assertTrue(channelHandler.listeningChannel.isOpen());
+        assertEquals(channelHandler.channelGroup.size(), 1);
+
+        // Exercise closeable with re-connection trial
+        channelHandler.close();
+        assertFalse(channelHandler.listeningChannel.isOpen());
+        assertEquals(channelHandler.channelGroup.size(), 0);
+        try {
+            channelHandler.reconnect();
+        } catch (ChannelException e) {
+            // Expected: Can't reconnect after closing
+            assertFalse(channelHandler.listeningChannel.isOpen());
+            assertEquals(channelHandler.channelGroup.size(), 0);
+        }
+
+    }
+
+    @Test(timeOut = 10_000)
+    public void testNettyConnectionToTSOFromClient() throws Exception {
+
+        ClientBootstrap nettyClient = createNettyClientBootstrap();
+
+        ChannelFuture channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
+
+        // ------------------------------------------------------------------------------------------------------------
+        // Test the client can't connect cause the server is not there
+        // ------------------------------------------------------------------------------------------------------------
+        while (!channelF.isDone()) /** do nothing */ ;
+        assertFalse(channelF.isSuccess());
+
+        // ------------------------------------------------------------------------------------------------------------
+        // Test creation of a server connection
+        // ------------------------------------------------------------------------------------------------------------
+        channelHandler.reconnect();
+        assertTrue(channelHandler.listeningChannel.isOpen());
+        // Eventually the channel group of the server should contain the listening channel
+        assertEquals(channelHandler.channelGroup.size(), 1);
+
+        // ------------------------------------------------------------------------------------------------------------
+        // Test that a client can connect now
+        // ------------------------------------------------------------------------------------------------------------
+        channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
+        while (!channelF.isDone()) /** do nothing */ ;
+        assertTrue(channelF.isSuccess());
+        assertTrue(channelF.getChannel().isConnected());
+        // Eventually the channel group of the server should have 2 elements
+        while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
+
+        // ------------------------------------------------------------------------------------------------------------
+        // Close the channel on the client side and test we have one element less in the channel group
+        // ------------------------------------------------------------------------------------------------------------
+        channelF.getChannel().close().await();
+        // Eventually the channel group of the server should have only one element
+        while (channelHandler.channelGroup.size() != 1) /** do nothing */ ;
+
+        // ------------------------------------------------------------------------------------------------------------
+        // Open a new channel and test the connection closing on the server side through the channel handler
+        // ------------------------------------------------------------------------------------------------------------
+        channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
+        while (!channelF.isDone()) /** do nothing */ ;
+        assertTrue(channelF.isSuccess());
+        // Eventually the channel group of the server should have 2 elements again
+        while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
+        channelHandler.closeConnection();
+        assertFalse(channelHandler.listeningChannel.isOpen());
+        assertEquals(channelHandler.channelGroup.size(), 0);
+        // Wait some time and check the channel was closed
+        TimeUnit.SECONDS.sleep(1);
+        assertFalse(channelF.getChannel().isOpen());
+
+        // ------------------------------------------------------------------------------------------------------------
+        // Test server re-connections with connected clients
+        // ------------------------------------------------------------------------------------------------------------
+        // Connect first time
+        channelHandler.reconnect();
+        assertTrue(channelHandler.listeningChannel.isOpen());
+        // Eventually the channel group of the server should contain the listening channel
+        assertEquals(channelHandler.channelGroup.size(), 1);
+        // Check the client can connect
+        channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
+        while (!channelF.isDone()) /** do nothing */ ;
+        assertTrue(channelF.isSuccess());
+        // Eventually the channel group of the server should have 2 elements
+        while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
+        // Re-connect and check that client connection was gone
+        channelHandler.reconnect();
+        assertTrue(channelHandler.listeningChannel.isOpen());
+        // Eventually the channel group of the server should contain the listening channel
+        assertEquals(channelHandler.channelGroup.size(), 1);
+        // Wait some time and check the channel was closed
+        TimeUnit.SECONDS.sleep(1);
+        assertFalse(channelF.getChannel().isOpen());
+
+        // ------------------------------------------------------------------------------------------------------------
+        // Test closeable interface with re-connection trial
+        // ------------------------------------------------------------------------------------------------------------
+        channelHandler.close();
+        assertFalse(channelHandler.listeningChannel.isOpen());
+        assertEquals(channelHandler.channelGroup.size(), 0);
+    }
+
+    @Test(timeOut = 10_000)
+    public void testNettyChannelWriting() throws Exception {
+
+        // ------------------------------------------------------------------------------------------------------------
+        // Prepare test
+        // ------------------------------------------------------------------------------------------------------------
+
+        // Connect channel handler
+        channelHandler.reconnect();
+        // Create client and connect it
+        ClientBootstrap nettyClient = createNettyClientBootstrap();
+        ChannelFuture channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
+        // Basic checks for connection
+        while (!channelF.isDone()) /** do nothing */ ;
+        assertTrue(channelF.isSuccess());
+        assertTrue(channelF.getChannel().isConnected());
+        Channel channel = channelF.getChannel();
+        // Eventually the channel group of the server should have 2 elements
+        while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
+        // Write first handshake request
+        TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
+        // NOTE: Add here the required handshake capabilities when necessary
+        handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
+        channelF.getChannel().write(TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
+
+        // ------------------------------------------------------------------------------------------------------------
+        // Test channel writing
+        // ------------------------------------------------------------------------------------------------------------
+        testWritingTimestampRequest(channel);
+
+        testWritingCommitRequest(channel);
+    }
+
+    private void testWritingTimestampRequest(Channel channel) throws InterruptedException {
+        // Reset mock
+        reset(requestProcessor);
+        TSOProto.Request.Builder tsBuilder = TSOProto.Request.newBuilder();
+        TSOProto.TimestampRequest.Builder tsRequestBuilder = TSOProto.TimestampRequest.newBuilder();
+        tsBuilder.setTimestampRequest(tsRequestBuilder.build());
+        // Write into the channel
+        channel.write(tsBuilder.build()).await();
+        verify(requestProcessor, timeout(100).times(1)).timestampRequest(any(Channel.class), any(MonitoringContext.class));
+        verify(requestProcessor, timeout(100).never())
+                .commitRequest(anyLong(), anyCollectionOf(Long.class), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+    }
+
+    private void testWritingCommitRequest(Channel channel) throws InterruptedException {
+        // Reset mock
+        reset(requestProcessor);
+        TSOProto.Request.Builder commitBuilder = TSOProto.Request.newBuilder();
+        TSOProto.CommitRequest.Builder commitRequestBuilder = TSOProto.CommitRequest.newBuilder();
+        commitRequestBuilder.setStartTimestamp(666);
+        commitRequestBuilder.addCellId(666);
+        commitBuilder.setCommitRequest(commitRequestBuilder.build());
+        TSOProto.Request r = commitBuilder.build();
+        assertTrue(r.hasCommitRequest());
+        // Write into the channel
+        channel.write(commitBuilder.build()).await();
+        verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContext.class));
+        verify(requestProcessor, timeout(100).times(1))
+                .commitRequest(eq(666L), anyCollectionOf(Long.class), eq(false), any(Channel.class), any(MonitoringContext.class));
+    }
+
+    // ----------------------------------------------------------------------------------------------------------------
+    // Helper methods
+    // ----------------------------------------------------------------------------------------------------------------
+
+    private ClientBootstrap createNettyClientBootstrap() {
+
+        ChannelFactory factory = new NioClientSocketChannelFactory(
+                Executors.newCachedThreadPool(
+                        new ThreadFactoryBuilder().setNameFormat("client-boss-%d").build()),
+                Executors.newCachedThreadPool(
+                        new ThreadFactoryBuilder().setNameFormat("client-worker-%d").build()), 1);
+        // Create the bootstrap
+        ClientBootstrap bootstrap = new ClientBootstrap(factory);
+        bootstrap.setOption("tcpNoDelay", true);
+        bootstrap.setOption("keepAlive", true);
+        bootstrap.setOption("reuseAddress", true);
+        bootstrap.setOption("connectTimeoutMillis", 100);
+        ChannelPipeline pipeline = bootstrap.getPipeline();
+        pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
+        pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+        pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
+        pipeline.addLast("protobufencoder", new ProtobufEncoder());
+        pipeline.addLast("handler", new SimpleChannelHandler() {
+
+            @Override
+            public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
+                LOG.info("Channel {} connected", ctx.getChannel());
+            }
+
+            @Override
+            public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+                LOG.error("Error on channel {}", ctx.getChannel(), e.getCause());
+            }
+
+        });
+        return bootstrap;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/TestTSOStateManager.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTSOStateManager.java b/tso-server/src/test/java/org/apache/omid/tso/TestTSOStateManager.java
new file mode 100644
index 0000000..350db5f
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestTSOStateManager.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import org.apache.omid.tso.TSOStateManager.StateObserver;
+import org.apache.omid.tso.TSOStateManager.TSOState;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+
+public class TestTSOStateManager {
+
+    private static final long INITIAL_STATE_VALUE = 1L;
+    private static final long NEW_STATE_VALUE = 1000;
+
+    // Mocks
+    private TimestampOracle timestampOracle = mock(TimestampOracle.class);
+
+    // Component under test
+    private TSOStateManager stateManager = new TSOStateManagerImpl(timestampOracle);
+
+    @BeforeMethod
+    public void beforeMethod() {
+        // Initialize the state with the one reported by the Timestamp Oracle
+        when(timestampOracle.getLast()).thenReturn(INITIAL_STATE_VALUE);
+    }
+
+    @Test
+    public void testResetOfTSOServerState() throws Exception {
+
+        // Reset the state and check we get the initial state values
+        TSOState initialState = stateManager.reset();
+        assertEquals(initialState.getLowWatermark(), INITIAL_STATE_VALUE);
+        assertEquals(initialState.getEpoch(), INITIAL_STATE_VALUE);
+        assertTrue("In this implementation low watermark should be equal to epoch",
+                initialState.getLowWatermark() == initialState.getEpoch());
+
+        // Then, simulate a change in the state returned by the Timestamp Oracle...
+        when(timestampOracle.getLast()).thenReturn(NEW_STATE_VALUE);
+        // ... and again, reset the state and check we get the new values
+        TSOState secondState = stateManager.reset();
+        assertEquals(secondState.getLowWatermark(), NEW_STATE_VALUE);
+        assertEquals(secondState.getEpoch(), NEW_STATE_VALUE);
+        assertTrue("In this implementation low watermark should be equal to epoch",
+                secondState.getLowWatermark() == secondState.getEpoch());
+
+    }
+
+    @Test
+    public void testObserverRegistrationAndDeregistrationForStateChanges() throws Exception {
+
+        // Register observer 1 for receiving state changes
+        StateObserver observer1 = spy(new DummyObserver());
+        stateManager.register(observer1);
+
+        // Reset the state to trigger observer notifications
+        TSOState state = stateManager.reset();
+
+        // Check observer 1 was notified with the corresponding state
+        verify(observer1, timeout(100).times(1)).update(eq(state));
+
+        // Register observer 1 for receiving state changes
+        StateObserver observer2 = spy(new DummyObserver());
+        stateManager.register(observer2);
+
+        // Again, reset the state to trigger observer notifications
+        state = stateManager.reset();
+
+        // Check both observers were notified with the corresponding state
+        verify(observer1, timeout(100).times(1)).update(eq(state));
+        verify(observer2, timeout(100).times(1)).update(eq(state));
+
+        // De-register observer 1
+        stateManager.unregister(observer1);
+
+        // Again, reset the state to trigger observer notifications
+        state = stateManager.reset();
+
+        // Check only observer 2 was notified
+        verify(observer1, timeout(100).times(0)).update(eq(state));
+        verify(observer2, timeout(100).times(1)).update(eq(state));
+    }
+
+    // ------------------------------------------------------------------------
+    // -------------------------- Helper classes ------------------------------
+    // ------------------------------------------------------------------------
+
+    private class DummyObserver implements StateObserver {
+
+        @Override
+        public void update(TSOState state) throws IOException {
+        }
+
+    }
+
+}