You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by yo...@apache.org on 2019/04/29 16:46:51 UTC
[incubator-omid] 01/01: OMID-146 Fix consistensy of compaction when
the commit time stamp hasnt reached the commit table yet.
This is an automated email from the ASF dual-hosted git repository.
yonigo pushed a commit to branch 1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-omid.git
commit 764faed1091fa8fca8751c71f68203218f09378a
Author: Yonatan Gottesman <yo...@gmail.com>
AuthorDate: Mon Apr 22 16:19:20 2019 +0300
OMID-146 Fix consistensy of compaction when the commit time stamp hasnt reached the commit table yet.
---
.../apache/omid/transaction/CompactorScanner.java | 90 ++++--
.../apache/omid/transaction/TestCompaction.java | 27 ++
.../apache/omid/transaction/TestCompactionLL.java | 327 +++++++++++++++++++++
.../omid/transaction/TestSnapshotFilter.java | 4 +-
.../apache/omid/tso/AbstractRequestProcessor.java | 9 +-
.../src/main/java/org/apache/omid/tso/Batch.java | 5 +-
.../java/org/apache/omid/tso/PersistEvent.java | 10 +-
.../org/apache/omid/tso/PersistenceProcessor.java | 3 +-
.../apache/omid/tso/PersistenceProcessorImpl.java | 6 +-
.../omid/tso/PersitenceProcessorNullImpl.java | 7 +-
.../java/org/apache/omid/tso/ReplyProcessor.java | 10 +-
.../org/apache/omid/tso/ReplyProcessorImpl.java | 31 +-
.../apache/omid/tso/RequestProcessorPersistCT.java | 6 +-
.../apache/omid/tso/RequestProcessorSkipCT.java | 5 +-
.../org/apache/omid/tso/RetryProcessorImpl.java | 2 +-
.../test/java/org/apache/omid/tso/TestBatch.java | 7 +-
.../java/org/apache/omid/tso/TestPanicker.java | 5 +-
.../apache/omid/tso/TestPersistenceProcessor.java | 53 ++--
.../omid/tso/TestPersistenceProcessorHandler.java | 17 +-
.../org/apache/omid/tso/TestReplyProcessor.java | 60 +++-
.../org/apache/omid/tso/TestRequestProcessor.java | 22 +-
.../org/apache/omid/tso/TestRetryProcessor.java | 2 +-
...grationOfTSOClientServerBasicFunctionality.java | 28 ++
23 files changed, 622 insertions(+), 114 deletions(-)
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
index 70c1d27..df30586 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorScanner.java
@@ -228,43 +228,91 @@ public class CompactorScanner implements InternalScanner {
}
}
- private Optional<CommitTimestamp> queryCommitTimestamp(Cell cell) throws IOException {
- Optional<CommitTimestamp> cachedValue = commitCache.get(cell.getTimestamp());
- if (cachedValue != null) {
- return cachedValue;
- }
+
+ private Result getShadowCell(byte[] row, byte[] family, byte[] qualifier, long timestamp) throws IOException {
+ Get g = new Get(row);
+ g.addColumn(family, qualifier);
+ g.setTimeStamp(timestamp);
+ Result r = hRegion.get(g);
+ return r;
+ }
+
+
+ private Optional<CommitTimestamp> getCommitTimestampWithRaces(Cell cell) throws IOException {
try {
+ byte[] family = CellUtil.cloneFamily(cell);
+ byte[] qualifier = CellUtils.addShadowCellSuffixPrefix(cell.getQualifierArray(),
+ cell.getQualifierOffset(),
+ cell.getQualifierLength());
+ // 2) Then check the commit table
Optional<CommitTimestamp> ct = commitTableClient.getCommitTimestamp(cell.getTimestamp()).get();
if (ct.isPresent()) {
- commitCache.put(cell.getTimestamp(), ct);
- return Optional.of(ct.get());
- } else {
- Get g = new Get(CellUtil.cloneRow(cell));
- byte[] family = CellUtil.cloneFamily(cell);
- byte[] qualifier = CellUtils.addShadowCellSuffixPrefix(cell.getQualifierArray(),
- cell.getQualifierOffset(),
- cell.getQualifierLength());
- g.addColumn(family, qualifier);
- g.setTimeStamp(cell.getTimestamp());
- Result r = hRegion.get(g);
- if (r.containsColumn(family, qualifier)) {
+ if (ct.get().isValid()) {
+ return Optional.of(ct.get());
+ }
+ // If invalid still should check sc because maybe we got falsely invalidated by another compaction or ll client
+ }
+
+ // 3) Read from shadow cell
+ Result r = getShadowCell(CellUtil.cloneRow(cell), family, qualifier, cell.getTimestamp());
+ if (r.containsColumn(CellUtil.cloneFamily(cell), qualifier)) {
+ Optional<CommitTimestamp> retval = Optional.of(new CommitTimestamp(SHADOW_CELL,
+ Bytes.toLong(r.getValue(family, qualifier)), true));
+ return retval;
+ }
+
+ // [OMID-146] - we have to invalidate a transaction if it hasn't reached the commit table
+ // 4) invalidate the entry
+ Boolean invalidated = commitTableClient.tryInvalidateTransaction(cell.getTimestamp()).get();
+ if (invalidated) {
+ // If we are running lowLatency Omid, we could have managed to invalidate a ct entry,
+ // but the committing client already wrote to shadow cells:
+ Result r2 = getShadowCell(CellUtil.cloneRow(cell), family, qualifier, cell.getTimestamp());
+ if (r2.containsColumn(CellUtil.cloneFamily(cell), qualifier)) {
Optional<CommitTimestamp> retval = Optional.of(new CommitTimestamp(SHADOW_CELL,
- Bytes.toLong(r.getValue(family, qualifier)), true));
- commitCache.put(cell.getTimestamp(), retval);
+ Bytes.toLong(r2.getValue(family, qualifier)), true));
+ commitTableClient.deleteCommitEntry(cell.getTimestamp());
return retval;
-
}
+ return Optional.absent();
+ }
+
+ // 5) We did not manage to invalidate the transactions then check the commit table
+ Optional<CommitTimestamp> ct2 = commitTableClient.getCommitTimestamp(cell.getTimestamp()).get();
+ if (ct2.isPresent()) {
+ return Optional.of(ct2.get());
+ }
+
+ // 6) Read from shadow cell
+ Result r2 = getShadowCell(CellUtil.cloneRow(cell), family, qualifier, cell.getTimestamp());
+ if (r2.containsColumn(CellUtil.cloneFamily(cell), qualifier)) {
+ Optional<CommitTimestamp> retval = Optional.of(new CommitTimestamp(SHADOW_CELL,
+ Bytes.toLong(r2.getValue(family, qualifier)), true));
+ return retval;
}
+
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while getting commit timestamp from commit table");
} catch (ExecutionException e) {
throw new IOException("Error getting commit timestamp from commit table", e);
}
- commitCache.put(cell.getTimestamp(), Optional.<CommitTimestamp>absent());
+
return Optional.absent();
}
+ private Optional<CommitTimestamp> queryCommitTimestamp(Cell cell) throws IOException {
+
+ // 1) First check the cache
+ Optional<CommitTimestamp> cachedValue = commitCache.get(cell.getTimestamp());
+ if (cachedValue != null) {
+ return cachedValue;
+ }
+ Optional<CommitTimestamp> value = getCommitTimestampWithRaces(cell);
+ commitCache.put(cell.getTimestamp(), value);
+ return value;
+ }
+
private void retain(List<Cell> result, Cell cell, Optional<Cell> shadowCell) {
LOG.trace("Retaining cell {}", cell);
result.add(cell);
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
index 666b19b..6e9c93b 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
@@ -1462,6 +1462,33 @@ public class TestCompaction {
"Put shadow cell shouldn't be there");
}
+ @Test(timeOut = 60_000)
+ public void testCommitTableNoInvalidation() throws Exception {
+ String TEST_TABLE = "testCommitTableInvalidation";
+ createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
+ TTable txTable = new TTable(connection, TEST_TABLE);
+ byte[] rowId = Bytes.toBytes("row");
+
+ HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
+ Put p = new Put(rowId);
+ p.addColumn(fam, qual, Bytes.toBytes("testValue"));
+ txTable.put(tx1, p);
+
+ HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
+ compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
+
+ try {
+ //give compaction time to invalidate
+ Thread.sleep(1000);
+
+ tm.commit(tx1);
+
+ } catch (RollbackException e) {
+ fail(" Should have not been invalidated");
+ }
+ }
+
+
private void setCompactorLWM(long lwm, String tableName) throws Exception {
OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(tableName)).get(0)
.getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompactionLL.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompactionLL.java
new file mode 100644
index 0000000..38b3a3d
--- /dev/null
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompactionLL.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+
+import static org.mockito.Mockito.doReturn;
+
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.io.IOException;
+
+
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+
+
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+
+import org.apache.hadoop.hbase.client.Put;
+
+import org.apache.hadoop.hbase.client.ResultScanner;
+
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.omid.TestUtils;
+import org.apache.omid.committable.CommitTable;
+import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
+import org.apache.omid.metrics.NullMetricsProvider;
+import org.apache.omid.timestamp.storage.HBaseTimestampStorageConfig;
+import org.apache.omid.tso.TSOServer;
+import org.apache.omid.tso.TSOServerConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+public class TestCompactionLL {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestCompactionLL.class);
+
+ private static final String TEST_FAMILY = "test-fam";
+ private static final String TEST_QUALIFIER = "test-qual";
+
+ private final byte[] fam = Bytes.toBytes(TEST_FAMILY);
+ private final byte[] qual = Bytes.toBytes(TEST_QUALIFIER);
+ private final byte[] data = Bytes.toBytes("testWrite-1");
+
+ private static final int MAX_VERSIONS = 3;
+
+ private Random randomGenerator;
+ private AbstractTransactionManager tm;
+
+ private Injector injector;
+
+ private Admin admin;
+ private Configuration hbaseConf;
+ private HBaseTestingUtility hbaseTestUtil;
+ private MiniHBaseCluster hbaseCluster;
+
+ private TSOServer tso;
+
+
+ private CommitTable commitTable;
+ private PostCommitActions syncPostCommitter;
+ private static Connection connection;
+
+ @BeforeClass
+ public void setupTestCompation() throws Exception {
+ TSOServerConfig tsoConfig = new TSOServerConfig();
+ tsoConfig.setPort(1235);
+ tsoConfig.setConflictMapSize(1);
+ tsoConfig.setLowLatency(true);
+ tsoConfig.setWaitStrategy("LOW_CPU");
+ injector = Guice.createInjector(new TSOForHBaseCompactorTestModule(tsoConfig));
+ hbaseConf = injector.getInstance(Configuration.class);
+ HBaseCommitTableConfig hBaseCommitTableConfig = injector.getInstance(HBaseCommitTableConfig.class);
+ HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
+
+ // settings required for #testDuplicateDeletes()
+ hbaseConf.setInt("hbase.hstore.compaction.min", 2);
+ hbaseConf.setInt("hbase.hstore.compaction.max", 2);
+ setupHBase();
+ connection = ConnectionFactory.createConnection(hbaseConf);
+ admin = connection.getAdmin();
+ createRequiredHBaseTables(hBaseTimestampStorageConfig, hBaseCommitTableConfig);
+ setupTSO();
+
+ commitTable = injector.getInstance(CommitTable.class);
+ }
+
+ private void setupHBase() throws Exception {
+ LOG.info("--------------------------------------------------------------------------------------------------");
+ LOG.info("Setting up HBase");
+ LOG.info("--------------------------------------------------------------------------------------------------");
+ hbaseTestUtil = new HBaseTestingUtility(hbaseConf);
+ LOG.info("--------------------------------------------------------------------------------------------------");
+ LOG.info("Creating HBase MiniCluster");
+ LOG.info("--------------------------------------------------------------------------------------------------");
+ hbaseCluster = hbaseTestUtil.startMiniCluster(1);
+ }
+
+ private void createRequiredHBaseTables(HBaseTimestampStorageConfig timestampStorageConfig,
+ HBaseCommitTableConfig hBaseCommitTableConfig) throws IOException {
+ createTableIfNotExists(timestampStorageConfig.getTableName(), timestampStorageConfig.getFamilyName().getBytes());
+
+ createTableIfNotExists(hBaseCommitTableConfig.getTableName(), hBaseCommitTableConfig.getCommitTableFamily(), hBaseCommitTableConfig.getLowWatermarkFamily());
+ }
+
+ private void createTableIfNotExists(String tableName, byte[]... families) throws IOException {
+ if (!admin.tableExists(TableName.valueOf(tableName))) {
+ LOG.info("Creating {} table...", tableName);
+ HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+
+ for (byte[] family : families) {
+ HColumnDescriptor datafam = new HColumnDescriptor(family);
+ datafam.setMaxVersions(MAX_VERSIONS);
+ desc.addFamily(datafam);
+ }
+
+ desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation",null,Coprocessor.PRIORITY_HIGHEST,null);
+ admin.createTable(desc);
+ for (byte[] family : families) {
+ CompactorUtil.enableOmidCompaction(connection, TableName.valueOf(tableName), family);
+ }
+ }
+
+ }
+
+ private void setupTSO() throws IOException, InterruptedException {
+ tso = injector.getInstance(TSOServer.class);
+ tso.startAndWait();
+ TestUtils.waitForSocketListening("localhost", 1235, 100);
+ Thread.currentThread().setName("UnitTest(s) thread");
+ }
+
+ @AfterClass
+ public void cleanupTestCompation() throws Exception {
+ teardownTSO();
+ hbaseCluster.shutdown();
+ }
+
+ private void teardownTSO() throws IOException, InterruptedException {
+ tso.stopAndWait();
+ TestUtils.waitForSocketNotListening("localhost", 1235, 1000);
+ }
+
+ @BeforeMethod
+ public void setupTestCompactionIndividualTest() throws Exception {
+ randomGenerator = new Random(0xfeedcafeL);
+ tm = spy((AbstractTransactionManager) newTransactionManager());
+ }
+
+ private TransactionManager newTransactionManager() throws Exception {
+ HBaseOmidClientConfiguration hbaseOmidClientConf = new HBaseOmidClientConfiguration();
+ hbaseOmidClientConf.setConnectionString("localhost:1235");
+ hbaseOmidClientConf.setHBaseConfiguration(hbaseConf);
+ CommitTable.Client commitTableClient = commitTable.getClient();
+ syncPostCommitter =
+ spy(new HBaseSyncPostCommitter(new NullMetricsProvider(),commitTableClient, connection));
+ return HBaseTransactionManager.builder(hbaseOmidClientConf)
+ .postCommitter(syncPostCommitter)
+ .commitTableClient(commitTableClient)
+ .build();
+ }
+
+
+
+ @Test(timeOut = 60_000)
+ public void testRowsUnalteredWhenCommitTableCannotBeReached() throws Throwable {
+ String TEST_TABLE = "testRowsUnalteredWhenCommitTableCannotBeReachedLL";
+ createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
+ TTable txTable = new TTable(connection, TEST_TABLE);
+
+ // The KV in this transaction should be discarded but in the end should remain there because
+ // the commit table won't be accessed (simulating an error on access)
+ HBaseTransaction neverendingTx = (HBaseTransaction) tm.begin();
+ long rowId = randomGenerator.nextLong();
+ Put put = new Put(Bytes.toBytes(rowId));
+ put.addColumn(fam, qual, data);
+ txTable.put(neverendingTx, put);
+ assertTrue(CellUtils.hasCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
+ new TTableCellGetterAdapter(txTable)),
+ "Cell should be there");
+ assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
+ new TTableCellGetterAdapter(txTable)),
+ "Shadow cell should not be there");
+
+ assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one rows in table before flushing");
+ LOG.info("Flushing table {}", TEST_TABLE);
+ admin.flush(TableName.valueOf(TEST_TABLE));
+ assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one rows in table after flushing");
+
+ // Break access to CommitTable functionality in Compactor
+ LOG.info("Regions in table {}: {}", TEST_TABLE, hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).size());
+ OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(TEST_TABLE)).get(0)
+ .getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
+ CommitTable commitTable = injector.getInstance(CommitTable.class);
+ CommitTable.Client commitTableClient = spy(commitTable.getClient());
+ SettableFuture<Long> f = SettableFuture.create();
+ f.setException(new IOException("Unable to read"));
+ doReturn(f).when(commitTableClient).readLowWatermark();
+ omidCompactor.commitTableClient = commitTableClient;
+ LOG.info("Compacting table {}", TEST_TABLE);
+ admin.majorCompact(TableName.valueOf(TEST_TABLE)); // Should trigger the error when accessing CommitTable funct.
+
+ LOG.info("Sleeping for 3 secs");
+ Thread.sleep(3000);
+ LOG.info("Waking up after 3 secs");
+
+ // All rows should be there after the failed compaction
+ assertEquals(rowCount(TEST_TABLE, fam), 1, "There should be only one row in table after compacting");
+ assertTrue(CellUtils.hasCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
+ new TTableCellGetterAdapter(txTable)),
+ "Cell should be there");
+ assertFalse(CellUtils.hasShadowCell(Bytes.toBytes(rowId), fam, qual, neverendingTx.getStartTimestamp(),
+ new TTableCellGetterAdapter(txTable)),
+ "Shadow cell should not be there");
+ }
+
+
+ @Test(timeOut = 60_000)
+ // test omid-147 in ll mode the scanner should invalidate the transaction
+ public void testCommitTableInvalidation() throws Exception {
+ String TEST_TABLE = "testCommitTableInvalidationLL";
+ createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY));
+ TTable txTable = new TTable(connection, TEST_TABLE);
+ byte[] rowId = Bytes.toBytes("row");
+
+ HBaseTransaction tx1 = (HBaseTransaction) tm.begin();
+ Put p = new Put(rowId);
+ p.addColumn(fam, qual, Bytes.toBytes("testValue"));
+ txTable.put(tx1, p);
+
+ HBaseTransaction lwmTx = (HBaseTransaction) tm.begin();
+ compactWithLWM(lwmTx.getStartTimestamp(), TEST_TABLE);
+
+ try {
+ //give compaction time to invalidate
+ Thread.sleep(1000);
+
+ tm.commit(tx1);
+ fail(" Should have been invalidated");
+ } catch (RollbackException e) {
+ e.printStackTrace();
+ }
+ }
+
+
+ private void setCompactorLWM(long lwm, String tableName) throws Exception {
+ OmidCompactor omidCompactor = (OmidCompactor) hbaseCluster.getRegions(Bytes.toBytes(tableName)).get(0)
+ .getCoprocessorHost().findCoprocessor(OmidCompactor.class.getName());
+ CommitTable commitTable = injector.getInstance(CommitTable.class);
+ CommitTable.Client commitTableClient = spy(commitTable.getClient());
+ SettableFuture<Long> f = SettableFuture.create();
+ f.set(lwm);
+ doReturn(f).when(commitTableClient).readLowWatermark();
+ omidCompactor.commitTableClient = commitTableClient;
+ }
+
+ private void compactEverything(String tableName) throws Exception {
+ compactWithLWM(Long.MAX_VALUE, tableName);
+ }
+
+ private void compactWithLWM(long lwm, String tableName) throws Exception {
+ admin.flush(TableName.valueOf(tableName));
+
+ LOG.info("Regions in table {}: {}", tableName, hbaseCluster.getRegions(Bytes.toBytes(tableName)).size());
+ setCompactorLWM(lwm, tableName);
+ LOG.info("Compacting table {}", tableName);
+ admin.majorCompact(TableName.valueOf(tableName));
+
+ LOG.info("Sleeping for 3 secs");
+ Thread.sleep(3000);
+ LOG.info("Waking up after 3 secs");
+ }
+
+ private static long rowCount(String tableName, byte[] family) throws Throwable {
+ Scan scan = new Scan();
+ scan.addFamily(family);
+ Table table = connection.getTable(TableName.valueOf(tableName));
+ try (ResultScanner scanner = table.getScanner(scan)) {
+ int count = 0;
+ while (scanner.next() != null) {
+ count++;
+ }
+ return count;
+ }
+ }
+}
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
index 2cfc77e..9471aa2 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
@@ -566,14 +566,14 @@ public class TestSnapshotFilter {
Put put1 = new Put(rowName1);
put1.addColumn(famName1, colName1, dataValue1);
tt.put(tx1, put1);
-
+
tm.commit(tx1);
Transaction tx2 = tm.begin();
Put put2 = new Put(rowName1);
put2.addColumn(famName1, colName1, dataValue1);
tt.put(tx2, put2);
-
+
Transaction tx3 = tm.begin();
Get get = new Get(rowName1);
diff --git a/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java
index 2dac28f..8a527ab 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java
@@ -17,6 +17,7 @@
*/
package org.apache.omid.tso;
+import com.google.common.base.Optional;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
@@ -234,7 +235,7 @@ abstract class AbstractRequestProcessor implements EventHandler<AbstractRequestP
!hasConflictsWithCommittedTransactions(startTimestamp, writeSet)) {
long commitTimestamp = timestampOracle.next();
-
+ Optional<Long> forwardNewWaterMark = Optional.absent();
if (nonEmptyWriteSet) {
long newLowWatermark = lowWatermark;
@@ -246,11 +247,11 @@ abstract class AbstractRequestProcessor implements EventHandler<AbstractRequestP
if (newLowWatermark != lowWatermark) {
LOG.trace("Setting new low Watermark to {}", newLowWatermark);
lowWatermark = newLowWatermark;
- lowWatermarkWriter.persistLowWatermark(newLowWatermark); // Async persist
+ forwardNewWaterMark = Optional.of(lowWatermark);
}
}
event.getMonCtx().timerStop("request.processor.commit.latency");
- forwardCommit(startTimestamp, commitTimestamp, c, event.getMonCtx());
+ forwardCommit(startTimestamp, commitTimestamp, c, event.getMonCtx(), forwardNewWaterMark);
} else {
@@ -296,7 +297,7 @@ abstract class AbstractRequestProcessor implements EventHandler<AbstractRequestP
}
- protected abstract void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
+ protected abstract void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx, Optional<Long> lowWatermark) throws Exception;
protected abstract void forwardCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
protected abstract void forwardAbort(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
protected abstract void forwardTimestamp(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
diff --git a/tso-server/src/main/java/org/apache/omid/tso/Batch.java b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
index 111c81c..7e6f43b 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/Batch.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
@@ -18,6 +18,7 @@
package org.apache.omid.tso;
import com.google.common.base.Objects;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
@@ -113,13 +114,13 @@ public class Batch {
}
- void addCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext context) {
+ void addCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext context, Optional<Long> newLowWatermark) {
Preconditions.checkState(!isFull(), "batch is full");
int index = numEvents++;
PersistEvent e = events[index];
context.timerStart("persistence.processor.commit.latency");
- e.makePersistCommit(startTimestamp, commitTimestamp, c, context);
+ e.makePersistCommit(startTimestamp, commitTimestamp, newLowWatermark, c, context);
}
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
index b89cdc5..977af00 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
@@ -18,6 +18,7 @@
package org.apache.omid.tso;
import com.google.common.base.Objects;
+import com.google.common.base.Optional;
import org.jboss.netty.channel.Channel;
public final class PersistEvent {
@@ -33,15 +34,16 @@ public final class PersistEvent {
private long startTimestamp = 0L;
private long commitTimestamp = 0L;
+ private Optional<Long> newLowWatermark;
- void makePersistCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
+ void makePersistCommit(long startTimestamp, long commitTimestamp, Optional<Long> newLowWatermark, Channel c, MonitoringContext monCtx) {
this.type = Type.COMMIT;
this.startTimestamp = startTimestamp;
this.commitTimestamp = commitTimestamp;
this.channel = c;
this.monCtx = monCtx;
-
+ this.newLowWatermark = newLowWatermark;
}
void makeCommitRetry(long startTimestamp, Channel c, MonitoringContext monCtx) {
@@ -111,6 +113,10 @@ public final class PersistEvent {
}
+ public Optional<Long> getNewLowWatermark() {
+ return newLowWatermark;
+ }
+
@Override
public String toString() {
return Objects.toStringHelper(this)
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
index 8bfe048..12831fa 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
@@ -17,13 +17,14 @@
*/
package org.apache.omid.tso;
+import com.google.common.base.Optional;
import org.jboss.netty.channel.Channel;
import java.io.Closeable;
interface PersistenceProcessor extends Closeable {
- void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
+ void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx, Optional<Long> lowWatermark)
throws Exception;
void addCommitRetryToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index 34276a3..123f783 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -19,6 +19,7 @@ package org.apache.omid.tso;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
+import com.google.common.base.Optional;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.name.Named;
import com.lmax.disruptor.EventFactory;
@@ -110,10 +111,11 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
}
@Override
- public void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx)
+ public void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx,
+ Optional<Long> newLowWatermark)
throws Exception {
- currentBatch.addCommit(startTimestamp, commitTimestamp, c, monCtx);
+ currentBatch.addCommit(startTimestamp, commitTimestamp, c, monCtx, newLowWatermark);
if (currentBatch.isFull()) {
triggerCurrentBatchFlush();
}
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java
index 773500c..8e0d171 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersitenceProcessorNullImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.omid.tso;
+import com.google.common.base.Optional;
import org.jboss.netty.channel.Channel;
import java.io.IOException;
@@ -24,8 +25,8 @@ import java.io.IOException;
public class PersitenceProcessorNullImpl implements PersistenceProcessor {
@Override
- public void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
-
+ public void addCommitToBatch(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx, Optional<Long> lowWatermark) throws Exception {
+ System.out.println("a");
}
@Override
@@ -40,7 +41,7 @@ public class PersitenceProcessorNullImpl implements PersistenceProcessor {
@Override
public void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
-
+ System.out.println("a");
}
@Override
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
index b580715..6c8186c 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
@@ -17,6 +17,7 @@
*/
package org.apache.omid.tso;
+import com.google.common.base.Optional;
import org.jboss.netty.channel.Channel;
import java.io.Closeable;
@@ -36,15 +37,16 @@ interface ReplyProcessor extends Closeable {
/**
* Allows to send a commit response back to the client.
- *
- * @param startTimestamp
+ * @param startTimestamp
* the start timestamp representing the tx identifier that is going to receive the commit response
* @param commitTimestamp
* the commit timestamp
* @param channel
- * the channel used to send the response back to the client
+ * the channel used to send the response back to the client
+ * @param newLowWatermark
+ *
*/
- void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel channel, MonitoringContext monCtx);
+ void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel channel, MonitoringContext monCtx, Optional<Long> newLowWatermark);
/**
* Allows to send an abort response back to the client.
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index dda4f8d..c350266 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -18,6 +18,7 @@
package org.apache.omid.tso;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.google.inject.name.Named;
@@ -69,9 +70,16 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
private final Meter timestampMeter;
private final Meter fenceMeter;
+ private final LowWatermarkWriter lowWatermarkWriter;
+ private long highestLowWaterMarkSeen;
+
@Inject
ReplyProcessorImpl(@Named("ReplyStrategy") WaitStrategy strategy,
- MetricsRegistry metrics, Panicker panicker, ObjectPool<Batch> batchPool) {
+ MetricsRegistry metrics,
+ Panicker panicker,
+ ObjectPool<Batch> batchPool,
+ LowWatermarkWriter lowWatermarkWriter) {
+ this.lowWatermarkWriter = lowWatermarkWriter;
// ------------------------------------------------------------------------------------------------------------
// Disruptor initialization
@@ -105,6 +113,7 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
LOG.info("ReplyProcessor initialized");
+ highestLowWaterMarkSeen = -1;
}
@VisibleForTesting
@@ -116,7 +125,11 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
switch (event.getType()) {
case COMMIT:
- sendCommitResponse(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel(), event.getMonCtx());
+ sendCommitResponse(event.getStartTimestamp(),
+ event.getCommitTimestamp(),
+ event.getChannel(),
+ event.getMonCtx(),
+ event.getNewLowWatermark());
break;
case ABORT:
sendAbortResponse(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
@@ -136,7 +149,6 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
}
batchPool.returnObject(batch);
-
}
private void processWaitingEvents() throws Exception {
@@ -180,9 +192,18 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
}
- @Override
- public void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
+ @VisibleForTesting
+ void updateLowWatermark(Optional<Long> newLowwatermark) {
+ if (newLowwatermark.isPresent() && newLowwatermark.get() > highestLowWaterMarkSeen) {
+ highestLowWaterMarkSeen = newLowwatermark.get();
+ lowWatermarkWriter.persistLowWatermark(highestLowWaterMarkSeen);
+ }
+ }
+ @Override
+ public void sendCommitResponse(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx
+ , Optional<Long> newLowWatermark) {
+ updateLowWatermark(newLowWatermark);
TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
TSOProto.CommitResponse.Builder commitBuilder = TSOProto.CommitResponse.newBuilder();
commitBuilder.setAborted(false)
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
index 0a58b0e..a967de4 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorPersistCT.java
@@ -17,6 +17,7 @@
*/
package org.apache.omid.tso;
+import com.google.common.base.Optional;
import com.google.inject.Inject;
import org.apache.omid.metrics.MetricsRegistry;
import org.jboss.netty.channel.Channel;
@@ -42,8 +43,9 @@ public class RequestProcessorPersistCT extends AbstractRequestProcessor {
}
@Override
- public void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
- persistenceProcessor.addCommitToBatch(startTimestamp,commitTimestamp,c,monCtx);
+ public void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx,
+ Optional<Long> lowWatermark) throws Exception {
+ persistenceProcessor.addCommitToBatch(startTimestamp,commitTimestamp,c,monCtx , lowWatermark);
}
@Override
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
index 41798f5..dcfc57e 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorSkipCT.java
@@ -17,6 +17,7 @@
*/
package org.apache.omid.tso;
+import com.google.common.base.Optional;
import com.google.inject.Inject;
import org.apache.omid.metrics.MetricsRegistry;
import org.jboss.netty.channel.Channel;
@@ -56,10 +57,10 @@ public class RequestProcessorSkipCT extends AbstractRequestProcessor {
}
@Override
- public void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx) {
+ public void forwardCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext monCtx, Optional<Long> newLowWatermark) {
commitSuicideIfNotMaster();
monCtx.timerStart("reply.processor.commit.latency");
- replyProcessor.sendCommitResponse(startTimestamp, commitTimestamp, c, monCtx);
+ replyProcessor.sendCommitResponse(startTimestamp, commitTimestamp, c, monCtx, newLowWatermark);
}
@Override
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
index 610e760..e667271 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
@@ -133,7 +133,7 @@ class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>,
if (commitTimestamp.isPresent()) {
if (commitTimestamp.get().isValid()) {
LOG.trace("Tx {}: Valid commit TS found in Commit Table. Sending Commit to client.", startTimestamp);
- replyProc.sendCommitResponse(startTimestamp, commitTimestamp.get().getValue(), event.getChannel(), event.getMonCtx());
+ replyProc.sendCommitResponse(startTimestamp, commitTimestamp.get().getValue(), event.getChannel(), event.getMonCtx(), Optional.<Long>absent());
txAlreadyCommittedMeter.mark();
} else {
LOG.trace("Tx {}: Invalid tx marker found. Sending Abort to client.", startTimestamp);
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
index c286f85..0f230cc 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestBatch.java
@@ -17,6 +17,7 @@
*/
package org.apache.omid.tso;
+import com.google.common.base.Optional;
import org.apache.commons.pool2.PooledObject;
import org.jboss.netty.channel.Channel;
import org.mockito.Mock;
@@ -80,7 +81,7 @@ public class TestBatch {
if (i % 4 == 0) {
batch.addTimestamp(ANY_ST, channel, monCtx);
} else if (i % 4 == 1) {
- batch.addCommit(ANY_ST, ANY_CT, channel, monCtx);
+ batch.addCommit(ANY_ST, ANY_CT, channel, monCtx, Optional.<Long>absent());
} else if (i % 4 == 2) {
batch.addCommitRetry(ANY_ST, channel, monCtx);
} else {
@@ -93,7 +94,7 @@ public class TestBatch {
// Test an exception is thrown when batch is full and a new element is going to be added
try {
- batch.addCommit(ANY_ST, ANY_CT, channel, monCtx);
+ batch.addCommit(ANY_ST, ANY_CT, channel, monCtx, Optional.<Long>absent());
fail("Should throw an IllegalStateException");
} catch (IllegalStateException e) {
assertEquals(e.getMessage(), "batch is full", "message returned doesn't match");
@@ -146,7 +147,7 @@ public class TestBatch {
// Put some elements in the batch...
batch.addTimestamp(ANY_ST, channel, monCtx);
- batch.addCommit(ANY_ST, ANY_CT, channel, monCtx);
+ batch.addCommit(ANY_ST, ANY_CT, channel, monCtx, Optional.<Long>absent());
batch.addCommitRetry(ANY_ST, channel, monCtx);
batch.addAbort(ANY_ST, channel, monCtx);
assertFalse(batch.isEmpty(), "Batch should contain elements");
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
index 779111d..7c6e003 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
@@ -17,6 +17,7 @@
*/
package org.apache.omid.tso;
+import com.google.common.base.Optional;
import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.metrics.MetricsRegistry;
@@ -137,7 +138,7 @@ public class TestPanicker {
handlers,
metrics);
- proc.addCommitToBatch(1, 2, null, new MonitoringContextImpl(metrics));
+ proc.addCommitToBatch(1, 2, null, new MonitoringContextImpl(metrics), Optional.<Long>absent());
LowWatermarkWriter lowWatermarkWriter = new LowWatermarkWriterImpl(config, commitTable, metrics);
@@ -192,7 +193,7 @@ public class TestPanicker {
panicker,
handlers,
metrics);
- proc.addCommitToBatch(1, 2, null, new MonitoringContextImpl(metrics));
+ proc.addCommitToBatch(1, 2, null, new MonitoringContextImpl(metrics), Optional.<Long>absent());
LowWatermarkWriter lowWatermarkWriter = new LowWatermarkWriterImpl(config, commitTable, metrics);
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
index 5d9e2c2..eb7d7f6 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
@@ -17,6 +17,7 @@
*/
package org.apache.omid.tso;
+import com.google.common.base.Optional;
import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.metrics.MetricsRegistry;
@@ -150,7 +151,7 @@ public class TestPersistenceProcessor {
ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
- ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
+ ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter);
PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
@@ -168,10 +169,10 @@ public class TestPersistenceProcessor {
verify(batchPool, times(1)).borrowObject(); // Called during initialization
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Flush: batch full
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Flush: batch full
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent()); // Flush: batch full
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent()); // Flush: batch full
verify(batchPool, times(1 + BATCH_SIZE_PER_CT_WRITER)).borrowObject(); // 3: 1 in init + 2 when flushing
@@ -193,7 +194,7 @@ public class TestPersistenceProcessor {
ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
- ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
+ ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter);
PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
@@ -213,11 +214,11 @@ public class TestPersistenceProcessor {
verify(batchPool, times(1)).borrowObject(); // Called during initialization
// Fill 1st handler Batches completely
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 1st batch full
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent()); // 1st batch full
verify(batchPool, times(2)).borrowObject();
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class));
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class)); // 2nd batch full
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class), Optional.<Long>absent());
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContext.class), Optional.<Long>absent()); // 2nd batch full
verify(batchPool, times(3)).borrowObject();
// Test empty flush does not trigger response in getting a new currentBatch
@@ -225,14 +226,14 @@ public class TestPersistenceProcessor {
verify(batchPool, times(3)).borrowObject();
// Fill 2nd handler Batches completely
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 1st batch full
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // 2nd batch full
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent()); // 1st batch full
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent()); // 2nd batch full
verify(batchPool, times(1 + (NUM_CT_WRITERS * BATCH_SIZE_PER_CT_WRITER))).borrowObject();
// Start filling a new currentBatch and flush it immediately
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class)); // Batch not full
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent()); // Batch not full
verify(batchPool, times(5)).borrowObject();
proc.triggerCurrentBatchFlush(); // Flushing should provoke invocation of a new batch
verify(batchPool, times(6)).borrowObject();
@@ -260,7 +261,7 @@ public class TestPersistenceProcessor {
ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
- ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
+ ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter);
// Init a non-HA lease manager
VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
@@ -283,7 +284,7 @@ public class TestPersistenceProcessor {
// The non-ha lease manager always return true for
// stillInLeasePeriod(), so verify the currentBatch sends replies as master
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
proc.triggerCurrentBatchFlush();
verify(leaseManager, timeout(1000).times(2)).stillInLeasePeriod();
verify(batchPool, times(2)).borrowObject();
@@ -338,7 +339,7 @@ public class TestPersistenceProcessor {
// Test: Configure the lease manager to return true always
doReturn(true).when(simulatedHALeaseManager).stillInLeasePeriod();
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
proc.triggerCurrentBatchFlush();
verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
verify(batchPool, times(2)).borrowObject();
@@ -359,7 +360,7 @@ public class TestPersistenceProcessor {
// Test: Configure the lease manager to return true first and false later for stillInLeasePeriod
doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
proc.triggerCurrentBatchFlush();
verify(simulatedHALeaseManager, timeout(1000).times(2)).stillInLeasePeriod();
verify(batchPool, times(2)).borrowObject();
@@ -380,7 +381,7 @@ public class TestPersistenceProcessor {
// Test: Configure the lease manager to return false for stillInLeasePeriod
doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
proc.triggerCurrentBatchFlush();
verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
verify(batchPool, times(2)).borrowObject();
@@ -404,7 +405,7 @@ public class TestPersistenceProcessor {
// Configure mock writer to flush unsuccessfully
doThrow(new IOException("Unable to write")).when(mockWriter).flush();
doReturn(true).doReturn(false).when(simulatedHALeaseManager).stillInLeasePeriod();
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class));
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), mock(MonitoringContextImpl.class), Optional.<Long>absent());
proc.triggerCurrentBatchFlush();
verify(simulatedHALeaseManager, timeout(1000).times(1)).stillInLeasePeriod();
verify(batchPool, times(2)).borrowObject();
@@ -421,7 +422,7 @@ public class TestPersistenceProcessor {
"localhost:1234",
leaseManager,
commitTable,
- new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool),
+ new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter),
retryProcessor,
new RuntimeExceptionPanicker());
}
@@ -438,7 +439,7 @@ public class TestPersistenceProcessor {
ObjectPool<Batch> batchPool = spy(new BatchPoolModule(config).getBatchPool());
- ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
+ ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter);
PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
@@ -463,7 +464,7 @@ public class TestPersistenceProcessor {
doThrow(new IOException("Unable to write@TestPersistenceProcessor2")).when(mockWriter).flush();
// Check the panic is extended!
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx);
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx, Optional.<Long>absent());
proc.triggerCurrentBatchFlush();
verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
@@ -476,7 +477,7 @@ public class TestPersistenceProcessor {
ObjectPool<Batch> batchPool = new BatchPoolModule(config).getBatchPool();
- ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
+ ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter);
PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
@@ -497,7 +498,7 @@ public class TestPersistenceProcessor {
MonitoringContextImpl monCtx = new MonitoringContextImpl(metrics);
// Check the panic is extended!
- proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx);
+ proc.addCommitToBatch(ANY_ST, ANY_CT, mock(Channel.class), monCtx, Optional.<Long>absent());
proc.triggerCurrentBatchFlush();
verify(panicker, timeout(1000).atLeastOnce()).panic(anyString(), any(Throwable.class));
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
index 4f190f9..473c981 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessorHandler.java
@@ -17,6 +17,7 @@
*/
package org.apache.omid.tso;
+import com.google.common.base.Optional;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.metrics.NullMetricsProvider;
@@ -197,7 +198,7 @@ public class TestPersistenceProcessorHandler {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
+ batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
persistenceHandler.onEvent(batchEvent);
@@ -256,7 +257,7 @@ public class TestPersistenceProcessorHandler {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
+ batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -286,7 +287,7 @@ public class TestPersistenceProcessorHandler {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
batch.addCommitRetry(FIRST_ST, null, mock(MonitoringContextImpl.class));
- batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContextImpl.class));
+ batch.addCommit(SECOND_ST, SECOND_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -366,9 +367,9 @@ public class TestPersistenceProcessorHandler {
batch.addTimestamp(FIRST_ST, null, mock(MonitoringContextImpl.class));
batch.addCommitRetry(SECOND_ST, null, mock(MonitoringContextImpl.class));
- batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContextImpl.class));
+ batch.addCommit(THIRD_ST, THIRD_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
batch.addAbort(FOURTH_ST, null, mock(MonitoringContextImpl.class));
- batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContextImpl.class));
+ batch.addCommit(FIFTH_ST, FIFTH_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
batch.addCommitRetry(SIXTH_ST, null, mock(MonitoringContextImpl.class));
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -408,7 +409,7 @@ public class TestPersistenceProcessorHandler {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
+ batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -450,7 +451,7 @@ public class TestPersistenceProcessorHandler {
// Prepare test batch
Batch batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
+ batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
PersistBatchEvent batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
@@ -485,7 +486,7 @@ public class TestPersistenceProcessorHandler {
// Prepare test batch
batch = new Batch(BATCH_ID, BATCH_SIZE);
- batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class));
+ batch.addCommit(FIRST_ST, FIRST_CT, null, mock(MonitoringContextImpl.class), Optional.<Long>absent());
batchEvent = new PersistBatchEvent();
PersistBatchEvent.makePersistBatch(batchEvent, BATCH_SEQUENCE, batch);
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
index 54d1e70..4659fa3 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
@@ -17,6 +17,8 @@
*/
package org.apache.omid.tso;
+import com.google.common.base.Optional;
+import com.google.common.util.concurrent.SettableFuture;
import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.metrics.NullMetricsProvider;
@@ -35,12 +37,7 @@ import com.lmax.disruptor.BlockingWaitStrategy;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@@ -65,6 +62,7 @@ public class TestReplyProcessor {
private static final long FIFTH_ST = 8L;
private static final long FIFTH_CT = 9L;
private static final long SIXTH_ST = 10L;
+ private static final long SIXTH_CT = 11L;
@Mock
private Panicker panicker;
@@ -78,6 +76,7 @@ public class TestReplyProcessor {
// Component under test
private ReplyProcessorImpl replyProcessor;
+ private LowWatermarkWriter lowWatermarkWriter;
@BeforeMethod(alwaysRun = true, timeOut = 30_000)
public void initMocksAndComponents() throws Exception {
@@ -92,8 +91,13 @@ public class TestReplyProcessor {
batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
- replyProcessor = spy(new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool));
+ lowWatermarkWriter = mock(LowWatermarkWriter.class);
+ SettableFuture<Void> f = SettableFuture.create();
+ f.set(null);
+ doReturn(f).when(lowWatermarkWriter).persistLowWatermark(any(Long.class));
+
+ replyProcessor = spy(new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool, lowWatermarkWriter));
}
@AfterMethod
@@ -104,7 +108,7 @@ public class TestReplyProcessor {
public void testBadFormedPackageThrowsException() throws Exception {
// We need an instance throwing exceptions for this test
- replyProcessor = spy(new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, new RuntimeExceptionPanicker(), batchPool));
+ replyProcessor = spy(new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, new RuntimeExceptionPanicker(), batchPool, lowWatermarkWriter));
// Prepare test batch
Batch batch = batchPool.borrowObject();
@@ -189,7 +193,7 @@ public class TestReplyProcessor {
// Prepare first a delayed batch (Batch #3)
Batch thirdBatch = batchPool.borrowObject();
thirdBatch.addTimestamp(FIRST_ST, mock(Channel.class), monCtx);
- thirdBatch.addCommit(SECOND_ST, SECOND_CT, mock(Channel.class), monCtx);
+ thirdBatch.addCommit(SECOND_ST, SECOND_CT, mock(Channel.class), monCtx, Optional.<Long>absent());
ReplyBatchEvent thirdBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
ReplyBatchEvent.makeReplyBatch(thirdBatchEvent, thirdBatch, 2); // Set a higher sequence than the initial one
@@ -210,7 +214,7 @@ public class TestReplyProcessor {
// Prepare another delayed batch (Batch #2)
Batch secondBatch = batchPool.borrowObject();
secondBatch.addTimestamp(THIRD_ST, mock(Channel.class), monCtx);
- secondBatch.addCommit(FOURTH_ST, FOURTH_CT, mock(Channel.class), monCtx);
+ secondBatch.addCommit(FOURTH_ST, FOURTH_CT, mock(Channel.class), monCtx, Optional.<Long>absent());
ReplyBatchEvent secondBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
ReplyBatchEvent.makeReplyBatch(secondBatchEvent, secondBatch, 1); // Set another higher sequence
@@ -249,9 +253,41 @@ public class TestReplyProcessor {
InOrder inOrderReplies = inOrder(replyProcessor, replyProcessor, replyProcessor, replyProcessor, replyProcessor);
inOrderReplies.verify(replyProcessor, times(1)).sendAbortResponse(eq(FIFTH_ST), any(Channel.class), eq(monCtx));
inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(THIRD_ST), any(Channel.class), eq(monCtx));
- inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(FOURTH_ST), eq(FOURTH_CT), any(Channel.class), eq(monCtx));
+ inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(FOURTH_ST), eq(FOURTH_CT), any(Channel.class), eq(monCtx), any(Optional.class));
inOrderReplies.verify(replyProcessor, times(1)).sendTimestampResponse(eq(FIRST_ST), any(Channel.class), eq(monCtx));
- inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(SECOND_ST), eq(SECOND_CT), any(Channel.class), eq(monCtx));
+ inOrderReplies.verify(replyProcessor, times(1)).sendCommitResponse(eq(SECOND_ST), eq(SECOND_CT), any(Channel.class), eq(monCtx), any(Optional.class));
+
+ }
+
+ @Test
+ public void testUpdateLowWaterMarkOnlyForMaxInBatch() throws Exception {
+
+ Batch thirdBatch = batchPool.borrowObject();
+ thirdBatch.addTimestamp(FIRST_ST, mock(Channel.class), monCtx);
+ thirdBatch.addCommit(SECOND_ST, SECOND_CT, mock(Channel.class), monCtx, Optional.of(100L));
+ thirdBatch.addCommit(THIRD_ST, THIRD_CT, mock(Channel.class), monCtx, Optional.of(50L));
+ thirdBatch.addCommit(FOURTH_ST, FOURTH_CT, mock(Channel.class), monCtx, Optional.<Long>absent());
+ thirdBatch.addCommit(FIFTH_ST, FIFTH_CT, mock(Channel.class), monCtx, Optional.of(100L));
+ thirdBatch.addCommit(SIXTH_ST, SIXTH_CT, mock(Channel.class), monCtx, Optional.of(150L));
+
+ ReplyBatchEvent thirdBatchEvent = ReplyBatchEvent.EVENT_FACTORY.newInstance();
+ ReplyBatchEvent.makeReplyBatch(thirdBatchEvent, thirdBatch, 0);
+
+ replyProcessor.onEvent(thirdBatchEvent, ANY_DISRUPTOR_SEQUENCE, false);
+
+ InOrder inOrderWatermarkWriter = inOrder(lowWatermarkWriter, lowWatermarkWriter, lowWatermarkWriter);
+
+ inOrderWatermarkWriter.verify(lowWatermarkWriter, times(1)).persistLowWatermark(eq(100L));
+ inOrderWatermarkWriter.verify(lowWatermarkWriter, times(1)).persistLowWatermark(eq(150L));
+
+ verify(lowWatermarkWriter, timeout(100).never()).persistLowWatermark(eq(50L));
+
+ InOrder inOrderCheckLWM = inOrder(replyProcessor, replyProcessor,replyProcessor,replyProcessor,replyProcessor);
+ inOrderCheckLWM.verify(replyProcessor, times(1)).updateLowWatermark(Optional.of(100L));
+ inOrderCheckLWM.verify(replyProcessor, times(1)).updateLowWatermark(Optional.of(50L));
+ inOrderCheckLWM.verify(replyProcessor, times(1)).updateLowWatermark(Optional.<Long>absent());
+ inOrderCheckLWM.verify(replyProcessor, times(1)).updateLowWatermark(Optional.of(100L));
+ inOrderCheckLWM.verify(replyProcessor, times(1)).updateLowWatermark(Optional.of(150L));
}
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
index 02a5387..b7abf2c 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
@@ -17,6 +17,7 @@
*/
package org.apache.omid.tso;
+import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
@@ -126,7 +127,7 @@ public class TestRequestProcessor {
requestProc.commitRequest(firstTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
ArgumentCaptor<Long> commitTScapture = ArgumentCaptor.forClass(Long.class);
- verify(persist, timeout(100).times(1)).addCommitToBatch(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
+ verify(persist, timeout(100).times(1)).addCommitToBatch(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContext.class), any(Optional.class));
assertTrue(commitTScapture.getValue() > firstTS, "Commit TS must be greater than start TS");
// test conflict
@@ -143,14 +144,14 @@ public class TestRequestProcessor {
long thirdTS = TScapture.getValue();
requestProc.commitRequest(thirdTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
- verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContextImpl.class));
+ verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContextImpl.class), any(Optional.class));
requestProc.commitRequest(secondTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), any(Channel.class), any(MonitoringContextImpl.class));
}
@Test(timeOut = 30_000)
- public void testFence() throws Exception {
+ public void testFence() {
requestProc.fenceRequest(666L, null, new MonitoringContextImpl(metrics));
ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
@@ -182,10 +183,9 @@ public class TestRequestProcessor {
}
- @Test(timeOut = 5_000)
- public void testLowWatermarkIsStoredOnlyWhenACacheElementIsEvicted() throws Exception {
-
- final int ANY_START_TS = 1;
+ @Test(timeOut=5_000)
+ public void testLowWaterIsForwardedWhenACacheElementIsEvicted() throws Exception {
+ final long ANY_START_TS = 1;
final long FIRST_COMMIT_TS_EVICTED = CommitTable.MAX_CHECKPOINTS_PER_TXN;
final long NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED = FIRST_COMMIT_TS_EVICTED + CommitTable.MAX_CHECKPOINTS_PER_TXN;
@@ -196,15 +196,15 @@ public class TestRequestProcessor {
requestProc.commitRequest(ANY_START_TS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContextImpl(metrics));
}
- Thread.currentThread().sleep(3000); // Allow the Request processor to finish the request processing
+ Thread.sleep(3000); // Allow the Request processor to finish the request processing
// Check that first time its called is on init
verify(lowWatermarkWriter, timeout(100).times(1)).persistLowWatermark(eq(0L));
// Then, check it is called when cache is full and the first element is evicted (should be a AbstractTransactionManager.NUM_OF_CHECKPOINTS)
- verify(lowWatermarkWriter, timeout(100).times(1)).persistLowWatermark(eq(FIRST_COMMIT_TS_EVICTED));
+ verify(persist, timeout(100).times(1)).addCommitToBatch(eq(ANY_START_TS), anyLong(), any(Channel.class), any(MonitoringContextImpl.class), eq(Optional.of(FIRST_COMMIT_TS_EVICTED)));
// Finally it should never be called with the next element
- verify(lowWatermarkWriter, timeout(100).never()).persistLowWatermark(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED));
+ verify(persist, timeout(100).never()).addCommitToBatch(eq(ANY_START_TS), anyLong(), any(Channel.class), any(MonitoringContextImpl.class), eq(Optional.of(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED)));
- }
+ }
}
\ No newline at end of file
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
index 5476f90..b2b8694 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
@@ -99,7 +99,7 @@ public class TestRetryProcessor {
verify(replyProc, timeout(100).times(1)).sendCommitResponse(firstTSCapture.capture(),
secondTSCapture.capture(),
- any(Channel.class), any(MonitoringContextImpl.class));
+ any(Channel.class), any(MonitoringContextImpl.class), any(Optional.class));
long startTS = firstTSCapture.getValue();
long commitTS = secondTSCapture.getValue();
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
index 4098f6c..2e069fa 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestIntegrationOfTSOClientServerBasicFunctionality.java
@@ -18,12 +18,15 @@
package org.apache.omid.tso.client;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import org.apache.omid.TestUtils;
import org.apache.omid.committable.CommitTable;
+import org.apache.omid.tso.LowWatermarkWriter;
import org.apache.omid.tso.TSOMockModule;
import org.apache.omid.tso.TSOServer;
import org.apache.omid.tso.TSOServerConfig;
@@ -39,6 +42,8 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
@@ -60,6 +65,7 @@ public class TestIntegrationOfTSOClientServerBasicFunctionality {
private TSOClient tsoClient;
private TSOClient justAnotherTSOClient;
private CommitTable.Client commitTableClient;
+ private LowWatermarkWriter lowWatermarkWriter;
@BeforeClass
public void setup() throws Exception {
@@ -72,6 +78,8 @@ public class TestIntegrationOfTSOClientServerBasicFunctionality {
Module tsoServerMockModule = new TSOMockModule(tsoConfig);
Injector injector = Guice.createInjector(tsoServerMockModule);
+ lowWatermarkWriter = injector.getInstance(LowWatermarkWriter.class);
+
CommitTable commitTable = injector.getInstance(CommitTable.class);
commitTableClient = commitTable.getClient();
@@ -288,4 +296,24 @@ public class TestIntegrationOfTSOClientServerBasicFunctionality {
assertTrue(commitTSTx1 < startTsTx4Client2, "Tx1 committed before Tx4 started on the other TSO Client");
}
+ @Test(timeOut = 30_000)
+ public void testLowWaterMarksgetAdvanced() throws ExecutionException, InterruptedException {
+
+ long startTsTx1Client1 = tsoClient.getNewStartTimestamp().get();
+ HashSet<DummyCellIdImpl> ws = new HashSet<>();
+ for (int i=0; i< 1000*32; ++i) {
+ ws.add(new DummyCellIdImpl(i));
+ }
+
+ Long beforeCommitLWM = commitTableClient.readLowWatermark().get();
+
+ Long commitTSTx1 = tsoClient.commit(startTsTx1Client1, ws).get();
+
+ Thread.sleep(300);
+
+ Long afterCommitLWM = commitTableClient.readLowWatermark().get();
+
+ assert(afterCommitLWM > beforeCommitLWM);
+ }
+
}