You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by ik...@apache.org on 2016/04/21 02:02:33 UTC
[16/52] [abbrv] incubator-omid git commit: Move com.yahoo ->
org.apache
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
new file mode 100644
index 0000000..34ced79
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.metrics.NullMetricsProvider;
+import org.jboss.netty.channel.Channel;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+
+public class TestBatch {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestBatch.class);
+
+ private static final int BATCH_SIZE = 1000;
+ private MetricsRegistry metrics = new NullMetricsProvider();
+
+ @Mock
+ private Channel channel;
+ @Mock
+ private RetryProcessor retryProcessor;
+ @Mock
+ private ReplyProcessor replyProcessor;
+
+ // The batch element to test
+ private PersistenceProcessorImpl.Batch batch;
+
+ @BeforeMethod(alwaysRun = true, timeOut = 30_000)
+ public void initMocksAndComponents() {
+ MockitoAnnotations.initMocks(this);
+ batch = new PersistenceProcessorImpl.Batch(BATCH_SIZE);
+ }
+
+ @Test
+ public void testBatchFunctionality() {
+
+ // Required mocks
+ Channel channel = Mockito.mock(Channel.class);
+ ReplyProcessor replyProcessor = Mockito.mock(ReplyProcessor.class);
+ RetryProcessor retryProcessor = Mockito.mock(RetryProcessor.class);
+
+ // The batch element to test
+ PersistenceProcessorImpl.Batch batch = new PersistenceProcessorImpl.Batch(BATCH_SIZE);
+
+ // Test initial state is OK
+ AssertJUnit.assertFalse("Batch shouldn't be full", batch.isFull());
+ AssertJUnit.assertEquals("Num events should be 0", 0, batch.getNumEvents());
+
+ // Test adding a single commit event is OK
+ MonitoringContext monCtx = new MonitoringContext(metrics);
+ monCtx.timerStart("commitPersistProcessor");
+ batch.addCommit(0, 1, channel, monCtx);
+ AssertJUnit.assertFalse("Batch shouldn't be full", batch.isFull());
+ AssertJUnit.assertEquals("Num events should be 1", 1, batch.getNumEvents());
+
+ // Test when filling the batch with events, batch is full
+ for (int i = 0; i < (BATCH_SIZE - 1); i++) {
+ if (i % 2 == 0) {
+ monCtx = new MonitoringContext(metrics);
+ monCtx.timerStart("timestampPersistProcessor");
+ batch.addTimestamp(i, channel, monCtx);
+ } else {
+ monCtx = new MonitoringContext(metrics);
+ monCtx.timerStart("commitPersistProcessor");
+ batch.addCommit(i, i + 1, channel, monCtx);
+ }
+ }
+ AssertJUnit.assertTrue("Batch should be full", batch.isFull());
+ AssertJUnit.assertEquals("Num events should be " + BATCH_SIZE, BATCH_SIZE, batch.getNumEvents());
+
+ // Test an exception is thrown when batch is full and a new element is going to be added
+ try {
+ monCtx = new MonitoringContext(metrics);
+ monCtx.timerStart("commitPersistProcessor");
+ batch.addCommit(0, 1, channel, new MonitoringContext(metrics));
+ Assert.fail("Should throw an IllegalStateException");
+ } catch (IllegalStateException e) {
+ AssertJUnit.assertEquals("message returned doesn't match", "batch full", e.getMessage());
+ LOG.debug("IllegalStateException catched properly");
+ }
+
+ // Test that sending replies empties the batch
+ final boolean MASTER_INSTANCE = true;
+ final boolean SHOULD_MAKE_HEURISTIC_DECISSION = true;
+ batch.sendRepliesAndReset(replyProcessor, retryProcessor, MASTER_INSTANCE);
+ verify(replyProcessor, timeout(100).times(BATCH_SIZE / 2))
+ .timestampResponse(anyLong(), any(Channel.class), any(MonitoringContext.class));
+ verify(replyProcessor, timeout(100).times(BATCH_SIZE / 2))
+ .commitResponse(eq(!SHOULD_MAKE_HEURISTIC_DECISSION), anyLong(), anyLong(),
+ any(Channel.class), any(MonitoringContext.class));
+ AssertJUnit.assertFalse("Batch shouldn't be full", batch.isFull());
+ AssertJUnit.assertEquals("Num events should be 0", 0, batch.getNumEvents());
+
+ }
+
+ @Test
+ public void testBatchFunctionalityWhenMastershipIsLost() {
+ Channel channel = Mockito.mock(Channel.class);
+
+ // Fill the batch with events till full
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ if (i % 2 == 0) {
+ MonitoringContext monCtx = new MonitoringContext(metrics);
+ monCtx.timerStart("timestampPersistProcessor");
+ batch.addTimestamp(i, channel, monCtx);
+ } else {
+ MonitoringContext monCtx = new MonitoringContext(metrics);
+ monCtx.timerStart("commitPersistProcessor");
+ batch.addCommit(i, i + 1, channel, monCtx);
+ }
+ }
+
+ // Test that sending replies empties the batch also when the replica
+ // is NOT master and calls the ambiguousCommitResponse() method on the
+ // reply processor
+ final boolean MASTER_INSTANCE = true;
+ final boolean SHOULD_MAKE_HEURISTIC_DECISSION = true;
+ batch.sendRepliesAndReset(replyProcessor, retryProcessor, !MASTER_INSTANCE);
+ verify(replyProcessor, timeout(100).times(BATCH_SIZE / 2))
+ .timestampResponse(anyLong(), any(Channel.class), any(MonitoringContext.class));
+ verify(replyProcessor, timeout(100).times(BATCH_SIZE / 2))
+ .commitResponse(eq(SHOULD_MAKE_HEURISTIC_DECISSION), anyLong(), anyLong(), any(Channel.class), any(
+ MonitoringContext.class));
+ assertFalse(batch.isFull(), "Batch shouldn't be full");
+ assertEquals(batch.getNumEvents(), 0, "Num events should be 0");
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/TestLeaseManager.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestLeaseManager.java b/tso-server/src/test/java/org/apache/omid/tso/TestLeaseManager.java
new file mode 100644
index 0000000..2eb4244
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestLeaseManager.java
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.google.common.base.Charsets;
+import org.apache.omid.TestUtils;
+import org.apache.omid.tso.TSOStateManager.TSOState;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.CloseableUtils;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+
+import static org.apache.omid.tso.client.TSOClient.DEFAULT_ZK_CLUSTER;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.AssertJUnit.assertFalse;
+import static org.testng.AssertJUnit.assertTrue;
+
+public class TestLeaseManager {
+
+ private static final long DUMMY_EPOCH_1 = 1L;
+ private static final long DUMMY_EPOCH_2 = 2L;
+ private static final long DUMMY_EPOCH_3 = 3L;
+ private static final long DUMMY_LOW_WATERMARK_1 = DUMMY_EPOCH_1;
+ private static final long DUMMY_LOW_WATERMARK_2 = DUMMY_EPOCH_2;
+ private static final long DUMMY_LOW_WATERMARK_3 = DUMMY_EPOCH_3;
+
+ private static final String LEASE_MGR_ID_1 = "LM1";
+ private static final String LEASE_MGR_ID_2 = "LM2";
+ private static final String INSTANCE_ID_1 = "LM1" + "#";
+ private static final String INSTANCE_ID_2 = "LM2" + "#";
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestLeaseManager.class);
+
+ private static final long TEST_LEASE_PERIOD_IN_MS = 1000; // 1 second
+
+ private CuratorFramework zkClient;
+ private TestingServer zkServer;
+
+ @Mock
+ private Panicker panicker;
+
+ private PausableLeaseManager leaseManager1;
+ private PausableLeaseManager leaseManager2;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+
+ LOG.info("Starting ZK Server");
+ zkServer = TestUtils.provideTestingZKServer();
+ LOG.info("ZK Server Started @ {}", zkServer.getConnectString());
+
+ zkClient = TestUtils.provideConnectedZKClient(DEFAULT_ZK_CLUSTER);
+
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+
+ zkClient.close();
+
+ CloseableUtils.closeQuietly(zkServer);
+ zkServer = null;
+ LOG.info("ZK Server Stopped");
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testErrorInitializingTSOStateExitsTheTSO() throws Exception {
+
+ final String TEST_TSO_LEASE_PATH = "/test0_tsolease";
+ final String TEST_CURRENT_TSO_PATH = "/test0_currenttso";
+
+ Panicker panicker = spy(new MockPanicker());
+
+ TSOChannelHandler tsoChannelHandler = mock(TSOChannelHandler.class);
+ TSOStateManager stateManager = mock(TSOStateManager.class);
+ when(stateManager.reset()).thenThrow(new IOException());
+ leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
+ tsoChannelHandler,
+ stateManager,
+ TEST_LEASE_PERIOD_IN_MS,
+ TEST_TSO_LEASE_PATH,
+ TEST_CURRENT_TSO_PATH,
+ zkClient,
+ panicker);
+ leaseManager1.startService();
+
+ // ... let the test run for some time...
+ Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+ verify(panicker, timeout(2000).atLeastOnce()).panic(anyString(), any(IOException.class));
+
+ leaseManager1.stopService();
+
+ }
+
+ @Test(timeOut = 60000)
+ public void testLeaseHolderDoesNotChangeWhenPausedForALongTimeAndTheresNoOtherInstance()
+ throws Exception {
+
+ final String TEST_TSO_LEASE_PATH = "/test1_tsolease";
+ final String TEST_CURRENT_TSO_PATH = "/test1_currenttso";
+
+ // Launch the instance under test...
+ TSOChannelHandler tsoChannelHandler1 = mock(TSOChannelHandler.class);
+ TSOStateManager stateManager1 = mock(TSOStateManager.class);
+ when(stateManager1.reset()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
+ leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
+ tsoChannelHandler1,
+ stateManager1,
+ TEST_LEASE_PERIOD_IN_MS,
+ TEST_TSO_LEASE_PATH,
+ TEST_CURRENT_TSO_PATH,
+ zkClient,
+ panicker);
+ leaseManager1.startService();
+
+ // ... let the test run for some time...
+ Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+ // ... check is the lease holder
+ checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
+ checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
+ assertTrue(leaseManager1.stillInLeasePeriod());
+
+ // Then, pause instance when trying to renew lease...
+ leaseManager1.pausedInTryToRenewLeasePeriod();
+
+ // ...let the test run for some time...
+ Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+ // ...check that nothing changed...
+ checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
+ checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
+
+ // Finally, resume the instance...
+ leaseManager1.resume();
+
+ // ... let the test run for some time...
+ Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+ // ... and check again that nothing changed
+ checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
+ checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
+ assertTrue(leaseManager1.stillInLeasePeriod());
+
+ }
+
+ @Test(timeOut = 60_000)
+ public void testLeaseHolderDoesNotChangeWhenANewLeaseManagerIsUp() throws Exception {
+
+ final String TEST_TSO_LEASE_PATH = "/test2_tsolease";
+ final String TEST_CURRENT_TSO_PATH = "/test2_currenttso";
+
+ // Launch the master instance...
+ TSOChannelHandler tsoChannelHandler1 = mock(TSOChannelHandler.class);
+ TSOStateManager stateManager1 = mock(TSOStateManager.class);
+ when(stateManager1.reset()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
+ leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
+ tsoChannelHandler1,
+ stateManager1,
+ TEST_LEASE_PERIOD_IN_MS,
+ TEST_TSO_LEASE_PATH,
+ TEST_CURRENT_TSO_PATH,
+ zkClient,
+ panicker);
+
+ leaseManager1.startService();
+
+ // ...let the test run for some time...
+ Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+ // ...so it should be the current holder of the lease
+ checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
+ checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
+ assertTrue(leaseManager1.stillInLeasePeriod());
+
+ // Then launch another instance...
+ TSOChannelHandler tsoChannelHandler2 = mock(TSOChannelHandler.class);
+ TSOStateManager stateManager2 = mock(TSOStateManager.class);
+ when(stateManager2.reset()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_2, DUMMY_EPOCH_2));
+ leaseManager2 = new PausableLeaseManager(LEASE_MGR_ID_2,
+ tsoChannelHandler2,
+ stateManager2,
+ TEST_LEASE_PERIOD_IN_MS,
+ TEST_TSO_LEASE_PATH,
+ TEST_CURRENT_TSO_PATH,
+ zkClient,
+ panicker);
+ leaseManager2.startService();
+
+ // ... let the test run for some time...
+ Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+ // ... and after the period, the first instance should be still the holder
+ checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
+ checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
+ assertTrue(leaseManager1.stillInLeasePeriod());
+ assertFalse(leaseManager2.stillInLeasePeriod());
+ }
+
+ @Test(timeOut = 60_000)
+ public void testLeaseHolderChangesWhenActiveLeaseManagerIsPaused() throws Exception {
+
+ final String TEST_TSO_LEASE_PATH = "/test3_tsolease";
+ final String TEST_CURRENT_TSO_PATH = "/test3_currenttso";
+
+ // Launch the master instance...
+ TSOChannelHandler tsoChannelHandler1 = mock(TSOChannelHandler.class);
+ TSOStateManager stateManager1 = mock(TSOStateManager.class);
+ when(stateManager1.reset()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
+ leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1,
+ tsoChannelHandler1,
+ stateManager1,
+ TEST_LEASE_PERIOD_IN_MS,
+ TEST_TSO_LEASE_PATH,
+ TEST_CURRENT_TSO_PATH,
+ zkClient,
+ panicker);
+
+ leaseManager1.startService();
+
+ // ... let the test run for some time...
+ Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+ // ... so it should be the current holder of the lease
+ checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
+ checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "1");
+ assertTrue(leaseManager1.stillInLeasePeriod());
+
+ // Then launch another instance...
+ TSOChannelHandler tsoChannelHandler2 = mock(TSOChannelHandler.class);
+ TSOStateManager stateManager2 = mock(TSOStateManager.class);
+ when(stateManager2.reset()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_2, DUMMY_EPOCH_2));
+ leaseManager2 = new PausableLeaseManager(LEASE_MGR_ID_2,
+ tsoChannelHandler2,
+ stateManager2,
+ TEST_LEASE_PERIOD_IN_MS,
+ TEST_TSO_LEASE_PATH,
+ TEST_CURRENT_TSO_PATH,
+ zkClient,
+ panicker);
+ leaseManager2.startService();
+
+ // ... and pause active lease manager...
+ leaseManager1.pausedInStillInLeasePeriod();
+
+ // ... and let the test run for some time...
+ Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+ // ... and check that lease owner should have changed to the second instance
+ checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_2);
+ checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_2 + "2");
+ assertTrue(leaseManager2.stillInLeasePeriod());
+
+ // Now, lets resume the first instance...
+ when(stateManager1.reset()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_3, DUMMY_EPOCH_3));
+ leaseManager1.resume();
+
+ // ... let the test run for some time...
+ Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+ // and check the lease owner is still the second instance (preserves the lease)
+ checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_2);
+ checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_2 + "2");
+ assertFalse(leaseManager1.stillInLeasePeriod());
+ assertTrue(leaseManager2.stillInLeasePeriod());
+
+ // Finally, pause active lease manager when trying to renew lease...
+ leaseManager2.pausedInTryToRenewLeasePeriod();
+
+ // ... let the test run for some time...
+ Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+ // ... and check lease owner is has changed again to the first instance
+ checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
+ checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "3");
+ assertFalse(leaseManager2.stillInLeasePeriod());
+ assertTrue(leaseManager1.stillInLeasePeriod());
+
+ // Resume the second instance...
+ leaseManager2.resume();
+
+ // ... let the test run for some time...
+ Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+ // ... but the lease owner should still be the first instance
+ checkLeaseHolder(TEST_TSO_LEASE_PATH, LEASE_MGR_ID_1);
+ checkInstanceId(TEST_CURRENT_TSO_PATH, INSTANCE_ID_1 + "3");
+ assertFalse(leaseManager2.stillInLeasePeriod());
+ assertTrue(leaseManager1.stillInLeasePeriod());
+
+ }
+
+
+ @Test(timeOut = 40_000)
+ public void testLeaseManagerPanicsWhenUnexpectedInfoIsFoundInCurrentTSOZnode() throws Exception {
+
+ final String TEST_TSO_LEASE_PATH = "/test_wronginfo_tsolease";
+ final String TEST_CURRENT_TSO_PATH = "/test_wronginfo_currenttso";
+
+ Panicker panicker = spy(new MockPanicker());
+
+ // Launch the master instance...
+ TSOStateManager stateManager1 = mock(TSOStateManager.class);
+ when(stateManager1.reset()).thenReturn(new TSOState(DUMMY_LOW_WATERMARK_1, DUMMY_EPOCH_1));
+ PausableLeaseManager leaseManager = new PausableLeaseManager(LEASE_MGR_ID_1,
+ mock(TSOChannelHandler.class),
+ stateManager1,
+ TEST_LEASE_PERIOD_IN_MS,
+ TEST_TSO_LEASE_PATH,
+ TEST_CURRENT_TSO_PATH,
+ zkClient,
+ panicker);
+
+ leaseManager.startService();
+ // ...and let the test run for some time...
+ Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+ leaseManager.pausedInTryToRenewLeasePeriod();
+
+ // 1st Panic test) Inject corrupted data in the ZNode, force reelection and test the panicker is exercised
+ zkClient.setData().forPath(TEST_CURRENT_TSO_PATH, "CorruptedData!!!".getBytes());
+
+ // ...and let the test run for some time...
+ Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+ leaseManager.resume();
+ // ...and let the test run for some time...
+ Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+ ArgumentCaptor<IllegalArgumentException> trowableIAE = ArgumentCaptor.forClass(IllegalArgumentException.class);
+ verify(panicker).panic(anyString(), trowableIAE.capture());
+ assertTrue(trowableIAE.getValue() instanceof IllegalArgumentException);
+ assertTrue(trowableIAE.getValue().getMessage().contains("Incorrect TSO Info found"));
+
+ // 2nd Panic test) Simulate that a new master appeared in the meantime, force reelection
+ // and test the panicker is exercised
+ reset(panicker);
+ zkClient.setData().forPath(TEST_CURRENT_TSO_PATH, "newTSO:12345#10000".getBytes());
+
+ leaseManager.pausedInTryToRenewLeasePeriod();
+
+ // ...and let the test run for some time...
+ Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+ leaseManager.resume();
+ // ...and let the test run for some time...
+ Thread.sleep(TEST_LEASE_PERIOD_IN_MS * 2);
+
+ ArgumentCaptor<LeaseManagement.LeaseManagementException> trowableLME =
+ ArgumentCaptor.forClass(LeaseManagement.LeaseManagementException.class);
+ verify(panicker).panic(anyString(), trowableLME.capture());
+ assertTrue(trowableLME.getValue() instanceof LeaseManagement.LeaseManagementException);
+ assertTrue(trowableLME.getValue().getMessage().contains("Another TSO replica was found"));
+ }
+
+ @Test(timeOut = 1000)
+ public void testNonHALeaseManager() throws Exception {
+
+ // Launch the instance...
+ VoidLeaseManager leaseManager = new VoidLeaseManager(mock(TSOChannelHandler.class),
+ mock(TSOStateManager.class));
+
+ leaseManager.startService();
+ assertTrue(leaseManager.stillInLeasePeriod());
+ leaseManager.stopService();
+
+ }
+
+ // ----------------------------------------------------------------------------------------------------------------
+ // Checkers
+ // ----------------------------------------------------------------------------------------------------------------
+
+ private void checkLeaseHolder(String tsoLeasePath, String expectedLeaseHolder) throws Exception {
+ byte[] leaseHolderInBytes = zkClient.getData().forPath(tsoLeasePath);
+ String leaseHolder = new String(leaseHolderInBytes, Charsets.UTF_8);
+
+ assertEquals(leaseHolder, expectedLeaseHolder);
+ }
+
+ private void checkInstanceId(String currentTSOPath, String expectedInstanceId) throws Exception {
+ byte[] expectedInstanceIdInBytes = zkClient.getData().forPath(currentTSOPath);
+ String instanceId = new String(expectedInstanceIdInBytes, Charsets.UTF_8);
+
+ assertEquals(instanceId, expectedInstanceId);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/TestLongCache.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestLongCache.java b/tso-server/src/test/java/org/apache/omid/tso/TestLongCache.java
new file mode 100644
index 0000000..6a88686
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestLongCache.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+
+public class TestLongCache {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestLongCache.class);
+
+ private static final long TEST_VALUE = 1000;
+
+ private Random random = new Random(System.currentTimeMillis());
+
+ @Test
+ public void testAddAndGetElemsAndResetCache() {
+
+ // Cache configuration
+ final int CACHE_SIZE = 10_000_000;
+ final int CACHE_ASSOCIATIVITY = 32;
+ Cache cache = new LongCache(CACHE_SIZE, CACHE_ASSOCIATIVITY);
+
+ // After creation, cache values should be the default
+ for (int i = 0; i < 1000; i++) {
+ long position = random.nextLong();
+ assertEquals(cache.get(position), LongCache.RESET_VALUE);
+ }
+
+ Set<Long> testedKeys = new TreeSet<>();
+ // Populate some of the values
+ for (int i = 0; i < 1000; i++) {
+ long position = random.nextLong();
+ cache.set(position, TEST_VALUE);
+ testedKeys.add(position);
+ }
+
+ // Get the values and check them
+ for (long key : testedKeys) {
+ assertEquals(cache.get(key), TEST_VALUE);
+ }
+
+ // Reset cache and check the values are the default again
+ long startTimeInMs = System.currentTimeMillis();
+ cache.reset();
+ long endTimeInMs = System.currentTimeMillis();
+ long resetTimeInMs = endTimeInMs - startTimeInMs;
+ LOG.info("Time in reseting cache of {}/{} elems/asoc {}ms", CACHE_SIZE, CACHE_ASSOCIATIVITY, resetTimeInMs);
+
+ for (long key : testedKeys) {
+ assertEquals(cache.get(key), LongCache.RESET_VALUE);
+ }
+
+ }
+
+ @Test(timeOut = 10000)
+ public void testEntriesAge() {
+
+ final int entries = 1000;
+
+ Cache cache = new LongCache(entries, 16);
+
+ int removals = 0;
+ long totalAge = 0;
+ double tempStdDev = 0;
+ double tempAvg = 0;
+
+ int i = 0;
+ int largestDeletedTimestamp = 0;
+ for (; i < entries * 10; ++i) {
+ long removed = cache.set(random.nextLong(), i);
+ if (removed > largestDeletedTimestamp) {
+ largestDeletedTimestamp = (int) removed;
+ }
+ }
+
+ long time = System.nanoTime();
+ for (; i < entries * 100; ++i) {
+ long removed = cache.set(random.nextLong(), i);
+ if (removed > largestDeletedTimestamp) {
+ largestDeletedTimestamp = (int) removed;
+ }
+ int gap = i - ((int) largestDeletedTimestamp);
+ removals++;
+ totalAge += gap;
+ double oldAvg = tempAvg;
+ tempAvg += (gap - tempAvg) / removals;
+ tempStdDev += (gap - oldAvg) * (gap - tempAvg);
+ }
+ long elapsed = System.nanoTime() - time;
+ LOG.info("Elapsed (ms): " + (elapsed / (double) 1000));
+
+ double avgGap = totalAge / (double) removals;
+ LOG.info("Avg gap: " + (tempAvg));
+ LOG.info("Std dev gap: " + Math.sqrt((tempStdDev / entries)));
+ assertTrue("avgGap should be greater than entries * 0.6",
+ avgGap > entries * 0.6);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
new file mode 100644
index 0000000..816194d
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.timestamp.storage.TimestampStorage;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+@SuppressWarnings({"UnusedDeclaration"})
+public class TestPanicker {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestPanicker.class);
+
+ @Mock
+ private CommitTable.Writer mockWriter;
+ @Mock
+ private MetricsRegistry metrics;
+
+ @BeforeMethod
+ public void initMocksAndComponents() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @AfterMethod
+ void afterMethod() {
+ Mockito.reset(mockWriter);
+ }
+
+ // Note this test has been moved and refactored to TestTimestampOracle because
+ // it tests the behaviour of the TimestampOracle.
+ // Please, remove me in a future commit
+ @Test
+ public void testTimestampOraclePanic() throws Exception {
+ TimestampStorage storage = spy(new TimestampOracleImpl.InMemoryTimestampStorage());
+ Panicker panicker = spy(new MockPanicker());
+
+ doThrow(new RuntimeException("Out of memory")).when(storage).updateMaxTimestamp(anyLong(), anyLong());
+
+ final TimestampOracleImpl tso = new TimestampOracleImpl(metrics, storage, panicker);
+ tso.initialize();
+ Thread allocThread = new Thread("AllocThread") {
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ tso.next();
+ }
+ } catch (IOException ioe) {
+ LOG.error("Shouldn't occur");
+ }
+ }
+ };
+ allocThread.start();
+
+ verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
+ }
+
+ // Note this test has been moved and refactored to TestPersistenceProcessor because
+ // it tests the behaviour of the PersistenceProcessor.
+ // Please, remove me in a future commit
+ @Test
+ public void testCommitTablePanic() throws Exception {
+ Panicker panicker = spy(new MockPanicker());
+
+ doThrow(new IOException("Unable to write@TestPanicker")).when(mockWriter).flush();
+
+ final CommitTable.Client mockClient = mock(CommitTable.Client.class);
+ CommitTable commitTable = new CommitTable() {
+ @Override
+ public Writer getWriter() {
+ return mockWriter;
+ }
+
+ @Override
+ public Client getClient() {
+ return mockClient;
+ }
+ };
+
+ LeaseManager leaseManager = mock(LeaseManager.class);
+ doReturn(true).when(leaseManager).stillInLeasePeriod();
+ PersistenceProcessor proc = new PersistenceProcessorImpl(new TSOServerConfig(),
+ metrics,
+ "localhost:1234",
+ leaseManager,
+ commitTable,
+ mock(ReplyProcessor.class),
+ mock(RetryProcessor.class),
+ panicker);
+ proc.persistCommit(1, 2, null, new MonitoringContext(metrics));
+ verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
+ }
+
+ // Note this test has been moved and refactored to TestPersistenceProcessor because
+ // it tests the behaviour of the PersistenceProcessor.
+ // Please, remove me in a future commit
+ @Test
+ public void testRuntimeExceptionTakesDownDaemon() throws Exception {
+ Panicker panicker = spy(new MockPanicker());
+
+ final CommitTable.Writer mockWriter = mock(CommitTable.Writer.class);
+ doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong());
+
+ final CommitTable.Client mockClient = mock(CommitTable.Client.class);
+ CommitTable commitTable = new CommitTable() {
+ @Override
+ public Writer getWriter() {
+ return mockWriter;
+ }
+
+ @Override
+ public Client getClient() {
+ return mockClient;
+ }
+ };
+ PersistenceProcessor proc = new PersistenceProcessorImpl(new TSOServerConfig(),
+ metrics,
+ "localhost:1234",
+ mock(LeaseManager.class),
+ commitTable,
+ mock(ReplyProcessor.class),
+ mock(RetryProcessor.class),
+ panicker);
+ proc.persistCommit(1, 2, null, new MonitoringContext(metrics));
+ verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
new file mode 100644
index 0000000..eb93277
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.metrics.NullMetricsProvider;
+import org.apache.omid.tso.PersistenceProcessorImpl.Batch;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+@SuppressWarnings({"UnusedDeclaration"})
+public class TestPersistenceProcessor {
+
+ @Mock
+ private Batch batch;
+ @Mock
+ private CommitTable.Writer mockWriter;
+ @Mock
+ private CommitTable.Client mockClient;
+ @Mock
+ private RetryProcessor retryProcessor;
+ @Mock
+ private ReplyProcessor replyProcessor;
+ @Mock
+ private Panicker panicker;
+
+ private MetricsRegistry metrics;
+ private CommitTable commitTable;
+
+ @BeforeMethod(alwaysRun = true, timeOut = 30_000)
+ public void initMocksAndComponents() throws Exception {
+
+ MockitoAnnotations.initMocks(this);
+
+ // Configure mock writer to flush successfully
+ doThrow(new IOException("Unable to write")).when(mockWriter).flush();
+
+ // Configure null metrics provider
+ metrics = new NullMetricsProvider();
+
+ // Configure commit table to return the mocked writer and client
+ commitTable = new CommitTable() {
+ @Override
+ public Writer getWriter() {
+ return mockWriter;
+ }
+
+ @Override
+ public Client getClient() {
+ return mockClient;
+ }
+ };
+ }
+
+ @AfterMethod
+ void afterMethod() {
+ Mockito.reset(mockWriter);
+ }
+
+ @Test
+ public void testCommitPersistenceWithNonHALeaseManager() throws Exception {
+
+ // Init a non-HA lease manager
+ VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
+ mock(TSOStateManager.class)));
+
+ TSOServerConfig tsoServerConfig = new TSOServerConfig();
+ tsoServerConfig.setBatchPersistTimeoutInMs(100);
+ // Component under test
+ PersistenceProcessor proc = new PersistenceProcessorImpl(tsoServerConfig,
+ metrics,
+ "localhost:1234",
+ batch,
+ leaseManager,
+ commitTable,
+ replyProcessor,
+ retryProcessor,
+ panicker);
+
+ // The non-ha lease manager always return true for
+ // stillInLeasePeriod(), so verify the batch sends replies as master
+ MonitoringContext monCtx = new MonitoringContext(metrics);
+ proc.persistCommit(1, 2, null, monCtx);
+ verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
+ verify(batch, timeout(1000).times(2)).sendRepliesAndReset(any(ReplyProcessor.class),
+ any(RetryProcessor.class),
+ eq(true));
+ }
+
+ @Test
+ public void testCommitPersistenceWithHALeaseManager() throws Exception {
+
+ // Init a HA lease manager
+ LeaseManager leaseManager = mock(LeaseManager.class);
+
+ TSOServerConfig tsoServerConfig = new TSOServerConfig();
+ tsoServerConfig.setBatchPersistTimeoutInMs(100);
+ // Component under test
+ PersistenceProcessor proc = new PersistenceProcessorImpl(tsoServerConfig,
+ metrics,
+ "localhost:1234",
+ batch,
+ leaseManager,
+ commitTable,
+ replyProcessor,
+ retryProcessor,
+ panicker);
+
+ // Configure the lease manager to always return true for
+ // stillInLeasePeriod, so verify the batch sends replies as master
+ doReturn(true).when(leaseManager).stillInLeasePeriod();
+ MonitoringContext monCtx = new MonitoringContext(metrics);
+ proc.persistCommit(1, 2, null, monCtx);
+ verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
+ verify(batch).sendRepliesAndReset(any(ReplyProcessor.class), any(RetryProcessor.class), eq(true));
+
+ // Configure the lease manager to always return true first and false
+ // later for stillInLeasePeriod, so verify the batch sends replies as
+ // non-master
+ reset(leaseManager);
+ reset(batch);
+ doReturn(true).doReturn(false).when(leaseManager).stillInLeasePeriod();
+ proc.persistCommit(1, 2, null, monCtx);
+ verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
+ verify(batch).sendRepliesAndReset(any(ReplyProcessor.class), any(RetryProcessor.class), eq(false));
+
+ // Configure the lease manager to always return false for
+ // stillInLeasePeriod, so verify the batch sends replies as non-master
+ reset(leaseManager);
+ reset(batch);
+ doReturn(false).when(leaseManager).stillInLeasePeriod();
+ proc.persistCommit(1, 2, null, monCtx);
+ verify(leaseManager, timeout(1000).times(1)).stillInLeasePeriod();
+ verify(batch).sendRepliesAndReset(any(ReplyProcessor.class), any(RetryProcessor.class), eq(false));
+
+ }
+
+ @Test
+ public void testCommitTableExceptionOnCommitPersistenceTakesDownDaemon() throws Exception {
+
+ // Init lease management (doesn't matter if HA or not)
+ LeaseManagement leaseManager = mock(LeaseManagement.class);
+ PersistenceProcessor proc = new PersistenceProcessorImpl(new TSOServerConfig(),
+ metrics,
+ "localhost:1234",
+ leaseManager,
+ commitTable,
+ mock(ReplyProcessor.class),
+ mock(RetryProcessor.class),
+ panicker);
+ MonitoringContext monCtx = new MonitoringContext(metrics);
+
+ // Configure lease manager to work normally
+ doReturn(true).when(leaseManager).stillInLeasePeriod();
+
+ // Configure commit table writer to explode when flushing changes to DB
+ doThrow(new IOException("Unable to write@TestPersistenceProcessor2")).when(mockWriter).flush();
+
+ // Check the panic is extended!
+ proc.persistCommit(1, 2, null, monCtx);
+ verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
+
+ }
+
+ @Test
+ public void testRuntimeExceptionOnCommitPersistenceTakesDownDaemon() throws Exception {
+
+ PersistenceProcessor proc = new PersistenceProcessorImpl(new TSOServerConfig(),
+ metrics,
+ "localhost:1234",
+ mock(LeaseManagement.class),
+ commitTable,
+ replyProcessor,
+ retryProcessor,
+ panicker);
+
+ // Configure writer to explode with a runtime exception
+ doThrow(new RuntimeException("Kaboom!")).when(mockWriter).addCommittedTransaction(anyLong(), anyLong());
+ MonitoringContext monCtx = new MonitoringContext(metrics);
+
+ // Check the panic is extended!
+ proc.persistCommit(1, 2, null, monCtx);
+ verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
new file mode 100644
index 0000000..c128beb
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.google.common.collect.Lists;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.metrics.NullMetricsProvider;
+import org.jboss.netty.channel.Channel;
+import org.mockito.ArgumentCaptor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.testng.AssertJUnit.assertTrue;
+
+public class TestRequestProcessor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestRequestProcessor.class);
+
+ private static final int CONFLICT_MAP_SIZE = 1000;
+ private static final int CONFLICT_MAP_ASSOCIATIVITY = 32;
+
+ private MetricsRegistry metrics = new NullMetricsProvider();
+
+ private PersistenceProcessor persist;
+
+ private TSOStateManager stateManager;
+
+ // Request processor under test
+ private RequestProcessor requestProc;
+
+ @BeforeMethod
+ public void beforeMethod() throws Exception {
+
+ // Build the required scaffolding for the test
+ MetricsRegistry metrics = new NullMetricsProvider();
+
+ TimestampOracleImpl timestampOracle =
+ new TimestampOracleImpl(metrics, new TimestampOracleImpl.InMemoryTimestampStorage(), new MockPanicker());
+
+ stateManager = new TSOStateManagerImpl(timestampOracle);
+
+ persist = mock(PersistenceProcessor.class);
+
+ TSOServerConfig config = new TSOServerConfig();
+ config.setMaxItems(CONFLICT_MAP_SIZE);
+
+ requestProc = new RequestProcessorImpl(config, metrics, timestampOracle, persist, new MockPanicker());
+
+ // Initialize the state for the experiment
+ stateManager.register(requestProc);
+ stateManager.reset();
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testTimestamp() throws Exception {
+
+ requestProc.timestampRequest(null, new MonitoringContext(metrics));
+ ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
+ verify(persist, timeout(100).times(1)).persistTimestamp(
+ firstTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+
+ long firstTS = firstTScapture.getValue();
+ // verify that timestamps increase monotonically
+ for (int i = 0; i < 100; i++) {
+ requestProc.timestampRequest(null, new MonitoringContext(metrics));
+ verify(persist, timeout(100).times(1)).persistTimestamp(eq(firstTS++), any(Channel.class), any(MonitoringContext.class));
+ }
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testCommit() throws Exception {
+
+ requestProc.timestampRequest(null, new MonitoringContext(metrics));
+ ArgumentCaptor<Long> TScapture = ArgumentCaptor.forClass(Long.class);
+ verify(persist, timeout(100).times(1)).persistTimestamp(
+ TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+ long firstTS = TScapture.getValue();
+
+ List<Long> writeSet = Lists.newArrayList(1L, 20L, 203L);
+ requestProc.commitRequest(firstTS - 1, writeSet, false, null, new MonitoringContext(metrics));
+ verify(persist, timeout(100).times(1)).persistAbort(eq(firstTS - 1), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+
+ requestProc.commitRequest(firstTS, writeSet, false, null, new MonitoringContext(metrics));
+ ArgumentCaptor<Long> commitTScapture = ArgumentCaptor.forClass(Long.class);
+
+ verify(persist, timeout(100).times(1)).persistCommit(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+ assertTrue("Commit TS must be greater than start TS", commitTScapture.getValue() > firstTS);
+
+ // test conflict
+ requestProc.timestampRequest(null, new MonitoringContext(metrics));
+ TScapture = ArgumentCaptor.forClass(Long.class);
+ verify(persist, timeout(100).times(2)).persistTimestamp(
+ TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+ long secondTS = TScapture.getValue();
+
+ requestProc.timestampRequest(null, new MonitoringContext(metrics));
+ TScapture = ArgumentCaptor.forClass(Long.class);
+ verify(persist, timeout(100).times(3)).persistTimestamp(
+ TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+ long thirdTS = TScapture.getValue();
+
+ requestProc.commitRequest(thirdTS, writeSet, false, null, new MonitoringContext(metrics));
+ verify(persist, timeout(100).times(1)).persistCommit(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContext.class));
+ requestProc.commitRequest(secondTS, writeSet, false, null, new MonitoringContext(metrics));
+ verify(persist, timeout(100).times(1)).persistAbort(eq(secondTS), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+
+ }
+
+ @Test(timeOut = 30_000)
+ public void testCommitRequestAbortsWhenResettingRequestProcessorState() throws Exception {
+
+ List<Long> writeSet = Collections.emptyList();
+
+ // Start a transaction...
+ requestProc.timestampRequest(null, new MonitoringContext(metrics));
+ ArgumentCaptor<Long> capturedTS = ArgumentCaptor.forClass(Long.class);
+ verify(persist, timeout(100).times(1)).persistTimestamp(capturedTS.capture(),
+ any(Channel.class),
+ any(MonitoringContext.class));
+ long startTS = capturedTS.getValue();
+
+ // ... simulate the reset of the RequestProcessor state (e.g. due to
+ // a change in mastership) and...
+ stateManager.reset();
+
+ // ...check that the transaction is aborted when trying to commit
+ requestProc.commitRequest(startTS, writeSet, false, null, new MonitoringContext(metrics));
+ verify(persist, timeout(100).times(1)).persistAbort(eq(startTS), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+
+ }
+
+ @Test(timeOut = 5_000)
+ public void testLowWatermarkIsStoredOnlyWhenACacheElementIsEvicted() throws Exception {
+
+ final int ANY_START_TS = 1;
+ final long FIRST_COMMIT_TS_EVICTED = 1L;
+ final long NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED = 2L;
+
+ // Fill the cache to provoke a cache eviction
+ for (long i = 0; i < CONFLICT_MAP_SIZE + CONFLICT_MAP_ASSOCIATIVITY; i++) {
+ long writeSetElementHash = i + 1; // This is to match the assigned CT: K/V in cache = WS Element Hash/CT
+ List<Long> writeSet = Lists.newArrayList(writeSetElementHash);
+ requestProc.commitRequest(ANY_START_TS, writeSet, false, null, new MonitoringContext(metrics));
+ }
+
+ Thread.currentThread().sleep(3000); // Allow the Request processor to finish the request processing
+
+ // Check that first time its called is on init
+ verify(persist, timeout(100).times(1)).persistLowWatermark(0L);
+ // Then, check it is called when cache is full and the first element is evicted (should be a 1)
+ verify(persist, timeout(100).times(1)).persistLowWatermark(FIRST_COMMIT_TS_EVICTED);
+ // Finally it should never be called with the next element
+ verify(persist, timeout(100).never()).persistLowWatermark(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED);
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
new file mode 100644
index 0000000..9ba4f9d
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.google.common.base.Optional;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.committable.CommitTable.CommitTimestamp;
+import org.apache.omid.committable.InMemoryCommitTable;
+import org.apache.omid.metrics.MetricsRegistry;
+import org.apache.omid.metrics.NullMetricsProvider;
+import org.jboss.netty.channel.Channel;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.testng.AssertJUnit.assertEquals;
+
+public class TestRetryProcessor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestRetryProcessor.class);
+
+ private MetricsRegistry metrics = new NullMetricsProvider();
+
+ private static long NON_EXISTING_ST_TX = 1000;
+ private static long ST_TX_1 = 0;
+ private static long CT_TX_1 = 1;
+ private static long ST_TX_2 = 2;
+
+ @Mock
+ private Channel channel;
+ @Mock
+ private ReplyProcessor replyProc;
+ @Mock
+ private Panicker panicker;
+
+ private CommitTable commitTable;
+
+ @BeforeMethod(alwaysRun = true, timeOut = 30_000)
+ public void initMocksAndComponents() {
+ MockitoAnnotations.initMocks(this);
+ // Init components
+ commitTable = new InMemoryCommitTable();
+ metrics = new NullMetricsProvider();
+
+ }
+
+ @Test(timeOut = 10_000)
+ public void testBasicFunctionality() throws Exception {
+
+ // The element to test
+ RetryProcessor retryProc = new RetryProcessorImpl(metrics, commitTable, replyProc, panicker);
+
+ // Test we'll reply with an abort for a retry request when the start timestamp IS NOT in the commit table
+ retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, new MonitoringContext(metrics));
+ ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
+ verify(replyProc, timeout(100).times(1)).abortResponse(firstTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+
+ long startTS = firstTScapture.getValue();
+ assertEquals("Captured timestamp should be the same as NON_EXISTING_ST_TX", NON_EXISTING_ST_TX, startTS);
+
+ // Test we'll reply with a commit for a retry request when the start timestamp IS in the commit table
+ commitTable.getWriter().addCommittedTransaction(ST_TX_1, CT_TX_1); // Add a tx to commit table
+
+ retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
+ ArgumentCaptor<Long> secondTScapture = ArgumentCaptor.forClass(Long.class);
+ verify(replyProc, timeout(100).times(1))
+ .commitResponse(eq(false), firstTScapture.capture(), secondTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+
+ startTS = firstTScapture.getValue();
+ long commitTS = secondTScapture.getValue();
+ assertEquals("Captured timestamp should be the same as ST_TX_1", ST_TX_1, startTS);
+ assertEquals("Captured timestamp should be the same as CT_TX_1", CT_TX_1, commitTS);
+ }
+
+ @Test(timeOut = 10_000)
+ public void testRetriedRequestForInvalidatedTransactionReturnsAnAbort() throws Exception {
+
+ // Invalidate the transaction
+ commitTable.getClient().tryInvalidateTransaction(ST_TX_1);
+
+ // Pre-start verification: Validate that the transaction is invalidated
+ // NOTE: This test should be in the a test class for InMemoryCommitTable
+ Optional<CommitTimestamp> invalidTxMarker = commitTable.getClient().getCommitTimestamp(ST_TX_1).get();
+ Assert.assertTrue(invalidTxMarker.isPresent());
+ Assert.assertEquals(invalidTxMarker.get().getValue(), InMemoryCommitTable.INVALID_TRANSACTION_MARKER);
+
+ // The element to test
+ RetryProcessor retryProc = new RetryProcessorImpl(metrics, commitTable, replyProc, panicker);
+
+ // Test we'll reply with an abort for a retry request when the
+ // transaction id IS in the commit table BUT invalidated
+ retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));
+ ArgumentCaptor<Long> startTScapture = ArgumentCaptor.forClass(Long.class);
+ verify(replyProc, timeout(100).times(1)).abortResponse(startTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+
+ long startTS = startTScapture.getValue();
+ Assert.assertEquals(startTS, ST_TX_1, "Captured timestamp should be the same as NON_EXISTING_ST_TX");
+
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
new file mode 100644
index 0000000..0f64181
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.omid.metrics.NullMetricsProvider;
+import org.apache.omid.proto.TSOProto;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelException;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
+import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyCollectionOf;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+@SuppressWarnings({"UnusedDeclaration", "StatementWithEmptyBody"})
+public class TestTSOChannelHandlerNetty {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestTSOChannelHandlerNetty.class);
+
+ @Mock
+ private
+ RequestProcessor requestProcessor;
+
+ // Component under test
+ private TSOChannelHandler channelHandler;
+
+ @BeforeMethod
+ public void beforeTestMethod() {
+ MockitoAnnotations.initMocks(this);
+ TSOServerConfig config = new TSOServerConfig();
+ config.setPort(1434);
+ channelHandler = new TSOChannelHandler(config, requestProcessor, new NullMetricsProvider());
+ }
+
+ @AfterMethod
+ public void afterTestMethod() throws IOException {
+ channelHandler.close();
+ }
+
+ @Test(timeOut = 10_000)
+ public void testMainAPI() throws Exception {
+
+ // Check initial state
+ assertNull(channelHandler.listeningChannel);
+ assertNull(channelHandler.channelGroup);
+
+ // Check initial connection
+ channelHandler.reconnect();
+ assertTrue(channelHandler.listeningChannel.isOpen());
+ assertEquals(channelHandler.channelGroup.size(), 1);
+ assertEquals(((InetSocketAddress) channelHandler.listeningChannel.getLocalAddress()).getPort(), 1434);
+
+ // Check connection close
+ channelHandler.closeConnection();
+ assertFalse(channelHandler.listeningChannel.isOpen());
+ assertEquals(channelHandler.channelGroup.size(), 0);
+
+ // Check re-closing connection
+ channelHandler.closeConnection();
+ assertFalse(channelHandler.listeningChannel.isOpen());
+ assertEquals(channelHandler.channelGroup.size(), 0);
+
+ // Check connection after closing
+ channelHandler.reconnect();
+ assertTrue(channelHandler.listeningChannel.isOpen());
+ assertEquals(channelHandler.channelGroup.size(), 1);
+
+ // Check re-connection
+ channelHandler.reconnect();
+ assertTrue(channelHandler.listeningChannel.isOpen());
+ assertEquals(channelHandler.channelGroup.size(), 1);
+
+ // Exercise closeable with re-connection trial
+ channelHandler.close();
+ assertFalse(channelHandler.listeningChannel.isOpen());
+ assertEquals(channelHandler.channelGroup.size(), 0);
+ try {
+ channelHandler.reconnect();
+ } catch (ChannelException e) {
+ // Expected: Can't reconnect after closing
+ assertFalse(channelHandler.listeningChannel.isOpen());
+ assertEquals(channelHandler.channelGroup.size(), 0);
+ }
+
+ }
+
+ @Test(timeOut = 10_000)
+ public void testNettyConnectionToTSOFromClient() throws Exception {
+
+ ClientBootstrap nettyClient = createNettyClientBootstrap();
+
+ ChannelFuture channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
+
+ // ------------------------------------------------------------------------------------------------------------
+ // Test the client can't connect cause the server is not there
+ // ------------------------------------------------------------------------------------------------------------
+ while (!channelF.isDone()) /** do nothing */ ;
+ assertFalse(channelF.isSuccess());
+
+ // ------------------------------------------------------------------------------------------------------------
+ // Test creation of a server connection
+ // ------------------------------------------------------------------------------------------------------------
+ channelHandler.reconnect();
+ assertTrue(channelHandler.listeningChannel.isOpen());
+ // Eventually the channel group of the server should contain the listening channel
+ assertEquals(channelHandler.channelGroup.size(), 1);
+
+ // ------------------------------------------------------------------------------------------------------------
+ // Test that a client can connect now
+ // ------------------------------------------------------------------------------------------------------------
+ channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
+ while (!channelF.isDone()) /** do nothing */ ;
+ assertTrue(channelF.isSuccess());
+ assertTrue(channelF.getChannel().isConnected());
+ // Eventually the channel group of the server should have 2 elements
+ while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
+
+ // ------------------------------------------------------------------------------------------------------------
+ // Close the channel on the client side and test we have one element less in the channel group
+ // ------------------------------------------------------------------------------------------------------------
+ channelF.getChannel().close().await();
+ // Eventually the channel group of the server should have only one element
+ while (channelHandler.channelGroup.size() != 1) /** do nothing */ ;
+
+ // ------------------------------------------------------------------------------------------------------------
+ // Open a new channel and test the connection closing on the server side through the channel handler
+ // ------------------------------------------------------------------------------------------------------------
+ channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
+ while (!channelF.isDone()) /** do nothing */ ;
+ assertTrue(channelF.isSuccess());
+ // Eventually the channel group of the server should have 2 elements again
+ while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
+ channelHandler.closeConnection();
+ assertFalse(channelHandler.listeningChannel.isOpen());
+ assertEquals(channelHandler.channelGroup.size(), 0);
+ // Wait some time and check the channel was closed
+ TimeUnit.SECONDS.sleep(1);
+ assertFalse(channelF.getChannel().isOpen());
+
+ // ------------------------------------------------------------------------------------------------------------
+ // Test server re-connections with connected clients
+ // ------------------------------------------------------------------------------------------------------------
+ // Connect first time
+ channelHandler.reconnect();
+ assertTrue(channelHandler.listeningChannel.isOpen());
+ // Eventually the channel group of the server should contain the listening channel
+ assertEquals(channelHandler.channelGroup.size(), 1);
+ // Check the client can connect
+ channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
+ while (!channelF.isDone()) /** do nothing */ ;
+ assertTrue(channelF.isSuccess());
+ // Eventually the channel group of the server should have 2 elements
+ while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
+ // Re-connect and check that client connection was gone
+ channelHandler.reconnect();
+ assertTrue(channelHandler.listeningChannel.isOpen());
+ // Eventually the channel group of the server should contain the listening channel
+ assertEquals(channelHandler.channelGroup.size(), 1);
+ // Wait some time and check the channel was closed
+ TimeUnit.SECONDS.sleep(1);
+ assertFalse(channelF.getChannel().isOpen());
+
+ // ------------------------------------------------------------------------------------------------------------
+ // Test closeable interface with re-connection trial
+ // ------------------------------------------------------------------------------------------------------------
+ channelHandler.close();
+ assertFalse(channelHandler.listeningChannel.isOpen());
+ assertEquals(channelHandler.channelGroup.size(), 0);
+ }
+
+ @Test(timeOut = 10_000)
+ public void testNettyChannelWriting() throws Exception {
+
+ // ------------------------------------------------------------------------------------------------------------
+ // Prepare test
+ // ------------------------------------------------------------------------------------------------------------
+
+ // Connect channel handler
+ channelHandler.reconnect();
+ // Create client and connect it
+ ClientBootstrap nettyClient = createNettyClientBootstrap();
+ ChannelFuture channelF = nettyClient.connect(new InetSocketAddress("localhost", 1434));
+ // Basic checks for connection
+ while (!channelF.isDone()) /** do nothing */ ;
+ assertTrue(channelF.isSuccess());
+ assertTrue(channelF.getChannel().isConnected());
+ Channel channel = channelF.getChannel();
+ // Eventually the channel group of the server should have 2 elements
+ while (channelHandler.channelGroup.size() != 2) /** do nothing */ ;
+ // Write first handshake request
+ TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
+ // NOTE: Add here the required handshake capabilities when necessary
+ handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
+ channelF.getChannel().write(TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
+
+ // ------------------------------------------------------------------------------------------------------------
+ // Test channel writing
+ // ------------------------------------------------------------------------------------------------------------
+ testWritingTimestampRequest(channel);
+
+ testWritingCommitRequest(channel);
+ }
+
+ private void testWritingTimestampRequest(Channel channel) throws InterruptedException {
+ // Reset mock
+ reset(requestProcessor);
+ TSOProto.Request.Builder tsBuilder = TSOProto.Request.newBuilder();
+ TSOProto.TimestampRequest.Builder tsRequestBuilder = TSOProto.TimestampRequest.newBuilder();
+ tsBuilder.setTimestampRequest(tsRequestBuilder.build());
+ // Write into the channel
+ channel.write(tsBuilder.build()).await();
+ verify(requestProcessor, timeout(100).times(1)).timestampRequest(any(Channel.class), any(MonitoringContext.class));
+ verify(requestProcessor, timeout(100).never())
+ .commitRequest(anyLong(), anyCollectionOf(Long.class), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+ }
+
+ private void testWritingCommitRequest(Channel channel) throws InterruptedException {
+ // Reset mock
+ reset(requestProcessor);
+ TSOProto.Request.Builder commitBuilder = TSOProto.Request.newBuilder();
+ TSOProto.CommitRequest.Builder commitRequestBuilder = TSOProto.CommitRequest.newBuilder();
+ commitRequestBuilder.setStartTimestamp(666);
+ commitRequestBuilder.addCellId(666);
+ commitBuilder.setCommitRequest(commitRequestBuilder.build());
+ TSOProto.Request r = commitBuilder.build();
+ assertTrue(r.hasCommitRequest());
+ // Write into the channel
+ channel.write(commitBuilder.build()).await();
+ verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContext.class));
+ verify(requestProcessor, timeout(100).times(1))
+ .commitRequest(eq(666L), anyCollectionOf(Long.class), eq(false), any(Channel.class), any(MonitoringContext.class));
+ }
+
+ // ----------------------------------------------------------------------------------------------------------------
+ // Helper methods
+ // ----------------------------------------------------------------------------------------------------------------
+
+ private ClientBootstrap createNettyClientBootstrap() {
+
+ ChannelFactory factory = new NioClientSocketChannelFactory(
+ Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setNameFormat("client-boss-%d").build()),
+ Executors.newCachedThreadPool(
+ new ThreadFactoryBuilder().setNameFormat("client-worker-%d").build()), 1);
+ // Create the bootstrap
+ ClientBootstrap bootstrap = new ClientBootstrap(factory);
+ bootstrap.setOption("tcpNoDelay", true);
+ bootstrap.setOption("keepAlive", true);
+ bootstrap.setOption("reuseAddress", true);
+ bootstrap.setOption("connectTimeoutMillis", 100);
+ ChannelPipeline pipeline = bootstrap.getPipeline();
+ pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
+ pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+ pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
+ pipeline.addLast("protobufencoder", new ProtobufEncoder());
+ pipeline.addLast("handler", new SimpleChannelHandler() {
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
+ LOG.info("Channel {} connected", ctx.getChannel());
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+ LOG.error("Error on channel {}", ctx.getChannel(), e.getCause());
+ }
+
+ });
+ return bootstrap;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9cd856c6/tso-server/src/test/java/org/apache/omid/tso/TestTSOStateManager.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTSOStateManager.java b/tso-server/src/test/java/org/apache/omid/tso/TestTSOStateManager.java
new file mode 100644
index 0000000..350db5f
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestTSOStateManager.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso;
+
+import org.apache.omid.tso.TSOStateManager.StateObserver;
+import org.apache.omid.tso.TSOStateManager.TSOState;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+
+public class TestTSOStateManager {
+
+ private static final long INITIAL_STATE_VALUE = 1L;
+ private static final long NEW_STATE_VALUE = 1000;
+
+ // Mocks
+ private TimestampOracle timestampOracle = mock(TimestampOracle.class);
+
+ // Component under test
+ private TSOStateManager stateManager = new TSOStateManagerImpl(timestampOracle);
+
+ @BeforeMethod
+ public void beforeMethod() {
+ // Initialize the state with the one reported by the Timestamp Oracle
+ when(timestampOracle.getLast()).thenReturn(INITIAL_STATE_VALUE);
+ }
+
+ @Test
+ public void testResetOfTSOServerState() throws Exception {
+
+ // Reset the state and check we get the initial state values
+ TSOState initialState = stateManager.reset();
+ assertEquals(initialState.getLowWatermark(), INITIAL_STATE_VALUE);
+ assertEquals(initialState.getEpoch(), INITIAL_STATE_VALUE);
+ assertTrue("In this implementation low watermark should be equal to epoch",
+ initialState.getLowWatermark() == initialState.getEpoch());
+
+ // Then, simulate a change in the state returned by the Timestamp Oracle...
+ when(timestampOracle.getLast()).thenReturn(NEW_STATE_VALUE);
+ // ... and again, reset the state and check we get the new values
+ TSOState secondState = stateManager.reset();
+ assertEquals(secondState.getLowWatermark(), NEW_STATE_VALUE);
+ assertEquals(secondState.getEpoch(), NEW_STATE_VALUE);
+ assertTrue("In this implementation low watermark should be equal to epoch",
+ secondState.getLowWatermark() == secondState.getEpoch());
+
+ }
+
+ @Test
+ public void testObserverRegistrationAndDeregistrationForStateChanges() throws Exception {
+
+ // Register observer 1 for receiving state changes
+ StateObserver observer1 = spy(new DummyObserver());
+ stateManager.register(observer1);
+
+ // Reset the state to trigger observer notifications
+ TSOState state = stateManager.reset();
+
+ // Check observer 1 was notified with the corresponding state
+ verify(observer1, timeout(100).times(1)).update(eq(state));
+
+ // Register observer 1 for receiving state changes
+ StateObserver observer2 = spy(new DummyObserver());
+ stateManager.register(observer2);
+
+ // Again, reset the state to trigger observer notifications
+ state = stateManager.reset();
+
+ // Check both observers were notified with the corresponding state
+ verify(observer1, timeout(100).times(1)).update(eq(state));
+ verify(observer2, timeout(100).times(1)).update(eq(state));
+
+ // De-register observer 1
+ stateManager.unregister(observer1);
+
+ // Again, reset the state to trigger observer notifications
+ state = stateManager.reset();
+
+ // Check only observer 2 was notified
+ verify(observer1, timeout(100).times(0)).update(eq(state));
+ verify(observer2, timeout(100).times(1)).update(eq(state));
+ }
+
+ // ------------------------------------------------------------------------
+ // -------------------------- Helper classes ------------------------------
+ // ------------------------------------------------------------------------
+
+ private class DummyObserver implements StateObserver {
+
+ @Override
+ public void update(TSOState state) throws IOException {
+ }
+
+ }
+
+}