You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by fp...@apache.org on 2018/01/31 00:00:46 UTC
[2/4] incubator-omid git commit: [OMID-79] Revert to the state of the
project at 0c371361781957c96b20295290a167f7be3b33e2
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
index ec0302c..e7dc8cf 100644
--- a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
+++ b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
@@ -19,9 +19,7 @@ package org.apache.omid.transaction;
import com.google.common.base.Function;
import com.google.common.base.Optional;
-import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.Futures;
-
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.CommitTable.CommitTimestamp;
import org.apache.omid.metrics.Counter;
@@ -58,8 +56,6 @@ public abstract class AbstractTransactionManager implements TransactionManager {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionManager.class);
- public final static int MAX_CHECKPOINTS_PER_TXN = 50;
-
public interface TransactionFactory<T extends CellId> {
AbstractTransaction<T> createTransaction(long transactionId, long epoch, AbstractTransactionManager tm);
@@ -74,7 +70,6 @@ public abstract class AbstractTransactionManager implements TransactionManager {
// Metrics
private final Timer startTimestampTimer;
private final Timer commitTimer;
- private final Timer fenceTimer;
private final Counter committedTxsCounter;
private final Counter rolledbackTxsCounter;
private final Counter errorTxsCounter;
@@ -109,7 +104,6 @@ public abstract class AbstractTransactionManager implements TransactionManager {
// Metrics configuration
this.startTimestampTimer = metrics.timer(name("omid", "tm", "hbase", "startTimestamp", "latency"));
this.commitTimer = metrics.timer(name("omid", "tm", "hbase", "commit", "latency"));
- this.fenceTimer = metrics.timer(name("omid", "tm", "hbase", "fence", "latency"));
this.committedTxsCounter = metrics.counter(name("omid", "tm", "hbase", "committedTxs"));
this.rolledbackTxsCounter = metrics.counter(name("omid", "tm", "hbase", "rolledbackTxs"));
this.errorTxsCounter = metrics.counter(name("omid", "tm", "hbase", "erroredTxs"));
@@ -166,48 +160,6 @@ public abstract class AbstractTransactionManager implements TransactionManager {
}
/**
- * Generates hash ID for table name, this hash is later-on sent to the TSO and used for fencing
- * @param tableName - the table name
- * @return
- */
- abstract public long getHashForTable(byte[] tableName);
-
- /**
- * Return the commit table client
- * @return commitTableClient
- */
- public CommitTable.Client getCommitTableClient() {
- return commitTableClient;
- }
-
- /**
- * @see org.apache.omid.transaction.TransactionManager#fence()
- */
- @Override
- public final Transaction fence(byte[] tableName) throws TransactionException {
- long fenceTimestamp;
- long tableID = getHashForTable(tableName); Hashing.murmur3_128().newHasher().putBytes(tableName).hash().asLong();
-
- try {
- fenceTimer.start();
- try {
- fenceTimestamp = tsoClient.getFence(tableID).get();
- } finally {
- fenceTimer.stop();
- }
-
- AbstractTransaction<? extends CellId> tx = transactionFactory.createTransaction(fenceTimestamp, fenceTimestamp, this);
-
- return tx;
- } catch (ExecutionException e) {
- throw new TransactionException("Could not get fence", e);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- throw new TransactionException("Interrupted creating a fence", ie);
- }
- }
-
- /**
* Allows transaction manager developers to perform actions after having started a transaction.
* @param transaction
* the transaction that was just created.
@@ -312,6 +264,98 @@ public abstract class AbstractTransactionManager implements TransactionManager {
public void postRollback(AbstractTransaction<? extends CellId> transaction) throws TransactionManagerException {}
/**
+ * Check if the transaction commit data is in the shadow cell
+ * @param cellStartTimestamp
+ * the transaction start timestamp
+ * locator
+ * the timestamp locator
+ * @throws IOException
+ */
+ Optional<CommitTimestamp> readCommitTimestampFromShadowCell(long cellStartTimestamp, CommitTimestampLocator locator)
+ throws IOException
+ {
+
+ Optional<CommitTimestamp> commitTS = Optional.absent();
+
+ Optional<Long> commitTimestamp = locator.readCommitTimestampFromShadowCell(cellStartTimestamp);
+ if (commitTimestamp.isPresent()) {
+ commitTS = Optional.of(new CommitTimestamp(SHADOW_CELL, commitTimestamp.get(), true)); // Valid commit TS
+ }
+
+ return commitTS;
+ }
+
+ /**
+ * This function returns the commit timestamp for a particular cell if the transaction was already committed in
+ * the system. In case the transaction was not committed and the cell was written by transaction initialized by a
+ * previous TSO server, an invalidation try occurs.
+ * Otherwise the function returns a value that indicates that the commit timestamp was not found.
+ * @param cellStartTimestamp
+ * start timestamp of the cell to locate the commit timestamp for.
+ * @param epoch
+ * the epoch of the TSO server the current tso client is working with.
+ * @param locator
+ * a locator to find the commit timestamp in the system.
+ * @return the commit timestamp joint with the location where it was found
+ * or an object indicating that it was not found in the system
+ * @throws IOException in case of any I/O issues
+ */
+ public CommitTimestamp locateCellCommitTimestamp(long cellStartTimestamp, long epoch,
+ CommitTimestampLocator locator) throws IOException {
+
+ try {
+ // 1) First check the cache
+ Optional<Long> commitTimestamp = locator.readCommitTimestampFromCache(cellStartTimestamp);
+ if (commitTimestamp.isPresent()) { // Valid commit timestamp
+ return new CommitTimestamp(CACHE, commitTimestamp.get(), true);
+ }
+
+ // 2) Then check the commit table
+ // If the data was written at a previous epoch, check whether the transaction was invalidated
+ Optional<CommitTimestamp> commitTimeStamp = commitTableClient.getCommitTimestamp(cellStartTimestamp).get();
+ if (commitTimeStamp.isPresent()) {
+ return commitTimeStamp.get();
+ }
+
+ // 3) Read from shadow cell
+ commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator);
+ if (commitTimeStamp.isPresent()) {
+ return commitTimeStamp.get();
+ }
+
+ // 4) Check the epoch and invalidate the entry
+ // if the data was written by a transaction from a previous epoch (previous TSO)
+ if (cellStartTimestamp < epoch) {
+ boolean invalidated = commitTableClient.tryInvalidateTransaction(cellStartTimestamp).get();
+ if (invalidated) { // Invalid commit timestamp
+ return new CommitTimestamp(COMMIT_TABLE, CommitTable.INVALID_TRANSACTION_MARKER, false);
+ }
+ }
+
+ // 5) We did not manage to invalidate the transactions then check the commit table
+ commitTimeStamp = commitTableClient.getCommitTimestamp(cellStartTimestamp).get();
+ if (commitTimeStamp.isPresent()) {
+ return commitTimeStamp.get();
+ }
+
+ // 6) Read from shadow cell
+ commitTimeStamp = readCommitTimestampFromShadowCell(cellStartTimestamp, locator);
+ if (commitTimeStamp.isPresent()) {
+ return commitTimeStamp.get();
+ }
+
+ // *) Otherwise return not found
+ return new CommitTimestamp(NOT_PRESENT, -1L /** TODO Check if we should return this */, true);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted while finding commit timestamp", e);
+ } catch (ExecutionException e) {
+ throw new IOException("Problem finding commit timestamp", e);
+ }
+
+ }
+
+ /**
* @see java.io.Closeable#close()
*/
@Override
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/transaction-client/src/main/java/org/apache/omid/transaction/TransactionManager.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/transaction/TransactionManager.java b/transaction-client/src/main/java/org/apache/omid/transaction/TransactionManager.java
index 3b272da..239aafc 100644
--- a/transaction-client/src/main/java/org/apache/omid/transaction/TransactionManager.java
+++ b/transaction-client/src/main/java/org/apache/omid/transaction/TransactionManager.java
@@ -58,15 +58,4 @@ public interface TransactionManager extends Closeable {
*/
void rollback(Transaction tx) throws TransactionException;
- /**
- * Creates a fence
- *
- * Creates a fence and returns a {@link Transaction} interface implementation that contains the fence information.
- *
- * @param tableName name of the table that requires a fence
- * @return transaction representation contains the fence timestamp as the TransactionId.
- * @throws TransactionException in case of any issues
- */
- Transaction fence(byte[] tableName) throws TransactionException;
-
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/transaction-client/src/main/java/org/apache/omid/tso/client/CellId.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/CellId.java b/transaction-client/src/main/java/org/apache/omid/tso/client/CellId.java
index 9643960..e40105e 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/CellId.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/CellId.java
@@ -21,7 +21,4 @@ public interface CellId {
long getCellId();
- long getTableId();
-
- long getRowId();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java
index 344d343..b4a205e 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/MockTSOClient.java
@@ -18,21 +18,16 @@
package org.apache.omid.tso.client;
import com.google.common.util.concurrent.SettableFuture;
-
import org.apache.omid.committable.CommitTable;
import java.io.IOException;
-import java.util.HashSet;
import java.util.Set;
-import java.util.Map;
-import java.util.HashMap;
import java.util.concurrent.atomic.AtomicLong;
class MockTSOClient implements TSOProtocol {
private final AtomicLong timestampGenerator = new AtomicLong();
private static final int CONFLICT_MAP_SIZE = 1_000_000;
private final long[] conflictMap = new long[CONFLICT_MAP_SIZE];
- private final Map<Long, Long> fenceMap = new HashMap<Long, Long>();
private final AtomicLong lwm = new AtomicLong();
private final CommitTable.Writer commitTable;
@@ -51,58 +46,6 @@ class MockTSOClient implements TSOProtocol {
}
@Override
- public TSOFuture<Long> getFence(long tableId) {
- synchronized (conflictMap) {
- SettableFuture<Long> f = SettableFuture.create();
- long fenceTimestamp = timestampGenerator.incrementAndGet();
- f.set(fenceTimestamp);
- fenceMap.put(tableId, fenceTimestamp);
- try {
- // Persist the fence by using the fence identifier as both the start and commit timestamp.
- commitTable.addCommittedTransaction(fenceTimestamp, fenceTimestamp);
- commitTable.flush();
- } catch (IOException ioe) {
- f.setException(ioe);
- }
- return new ForwardingTSOFuture<>(f);
- }
- }
-
- // Checks whether transaction transactionId started before a fence creation of a table transactionId modified.
- private boolean hasConflictsWithFences(long transactionId, Set<? extends CellId> cells) {
- Set<Long> tableIDs = new HashSet<Long>();
- for (CellId c : cells) {
- tableIDs.add(c.getTableId());
- }
-
- if (! fenceMap.isEmpty()) {
- for (long tableId : tableIDs) {
- Long fence = fenceMap.get(tableId);
- if (fence != null && transactionId < fence) {
- return true;
- }
- if (fence != null && fence < lwm.get()) { // GC
- fenceMap.remove(tableId);
- }
- }
- }
-
- return false;
- }
-
- // Checks whether transactionId has a write-write conflict with a transaction committed after transactionId.
- private boolean hasConflictsWithCommittedTransactions(long transactionId, Set<? extends CellId> cells) {
- for (CellId c : cells) {
- int index = Math.abs((int) (c.getCellId() % CONFLICT_MAP_SIZE));
- if (conflictMap[index] >= transactionId) {
- return true;
- }
- }
-
- return false;
- }
-
- @Override
public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells) {
synchronized (conflictMap) {
SettableFuture<Long> f = SettableFuture.create();
@@ -111,9 +54,16 @@ class MockTSOClient implements TSOProtocol {
return new ForwardingTSOFuture<>(f);
}
- if (!hasConflictsWithFences(transactionId, cells) &&
- !hasConflictsWithCommittedTransactions(transactionId, cells)) {
+ boolean canCommit = true;
+ for (CellId c : cells) {
+ int index = Math.abs((int) (c.getCellId() % CONFLICT_MAP_SIZE));
+ if (conflictMap[index] >= transactionId) {
+ canCommit = false;
+ break;
+ }
+ }
+ if (canCommit) {
long commitTimestamp = timestampGenerator.incrementAndGet();
for (CellId c : cells) {
int index = Math.abs((int) (c.getCellId() % CONFLICT_MAP_SIZE));
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/transaction-client/src/main/java/org/apache/omid/tso/client/OmidClientConfiguration.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/OmidClientConfiguration.java b/transaction-client/src/main/java/org/apache/omid/tso/client/OmidClientConfiguration.java
index 6bc6481..3542c55 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/OmidClientConfiguration.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/OmidClientConfiguration.java
@@ -32,8 +32,6 @@ public class OmidClientConfiguration {
public enum PostCommitMode {SYNC, ASYNC}
- public enum ConflictDetectionLevel {CELL, ROW}
-
// Basic connection related params
private ConnType connectionType = ConnType.DIRECT;
@@ -53,7 +51,6 @@ public class OmidClientConfiguration {
// Transaction Manager related params
private PostCommitMode postCommitMode = PostCommitMode.SYNC;
- private ConflictDetectionLevel conflictAnalysisLevel = ConflictDetectionLevel.CELL;
// ----------------------------------------------------------------------------------------------------------------
// Instantiation
@@ -177,13 +174,4 @@ public class OmidClientConfiguration {
this.postCommitMode = postCommitMode;
}
- public ConflictDetectionLevel getConflictAnalysisLevel() {
- return conflictAnalysisLevel;
- }
-
- @Inject(optional = true)
- @Named("omid.tm.conflictAnalysisLevel")
- public void setConflictAnalysisLevel(ConflictDetectionLevel conflictAnalysisLevel) {
- this.conflictAnalysisLevel = conflictAnalysisLevel;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
index 1c62876..1690ca6 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
@@ -21,10 +21,7 @@ import com.google.common.base.Charsets;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
import org.apache.omid.proto.TSOProto;
-import org.apache.omid.transaction.TransactionException;
-import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
import org.apache.omid.zk.ZKUtils;
import org.apache.statemachine.StateMachine;
import org.apache.curator.framework.CuratorFramework;
@@ -57,9 +54,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
@@ -68,7 +63,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-
/**
* Describes the abstract methods to communicate to the TSO server
*/
@@ -98,13 +92,6 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
private InetSocketAddress tsoAddr;
private String zkCurrentTsoPath;
-
- // Use to extract unique table identifiers from the modified cells list.
- private final Set<Long> tableIDs;
- // Conflict detection level of the entire system. Can either be Row or Cell level.
- private ConflictDetectionLevel conflictDetectionLevel;
- private Set<Long> rowLevelWriteSet;
-
// ----------------------------------------------------------------------------------------------------------------
// Construction
// ----------------------------------------------------------------------------------------------------------------
@@ -172,11 +159,6 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("reuseAddress", true);
bootstrap.setOption("connectTimeoutMillis", 100);
-
- this.tableIDs = new HashSet<Long>();
-
- conflictDetectionLevel = omidConf.getConflictAnalysisLevel();
- rowLevelWriteSet = new HashSet<Long>();
}
// ----------------------------------------------------------------------------------------------------------------
@@ -204,33 +186,9 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
TSOProto.CommitRequest.Builder commitbuilder = TSOProto.CommitRequest.newBuilder();
commitbuilder.setStartTimestamp(transactionId);
-
- rowLevelWriteSet.clear();
for (CellId cell : cells) {
- long id;
-
- switch (conflictDetectionLevel) {
- case ROW:
- id = cell.getRowId();
- if (rowLevelWriteSet.contains(id)) {
- continue;
- } else {
- rowLevelWriteSet.add(id);
- }
- break;
- case CELL:
- id = cell.getCellId();
- break;
- default:
- id = 0;
- assert (false);
- }
-
- commitbuilder.addCellId(id);
- tableIDs.add(cell.getTableId());
+ commitbuilder.addCellId(cell.getCellId());
}
- commitbuilder.addAllTableId(tableIDs);
- tableIDs.clear();
builder.setCommitRequest(commitbuilder.build());
RequestEvent request = new RequestEvent(builder.build(), requestMaxRetries);
fsm.sendEvent(request);
@@ -238,20 +196,6 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
}
/**
- * @see TSOProtocol#getFence()
- */
- @Override
- public TSOFuture<Long> getFence(long tableId) {
- TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
- TSOProto.FenceRequest.Builder fenceReqBuilder = TSOProto.FenceRequest.newBuilder();
- fenceReqBuilder.setTableId(tableId);
- builder.setFenceRequest(fenceReqBuilder.build());
- RequestEvent request = new RequestEvent(builder.build(), requestMaxRetries);
- fsm.sendEvent(request);
- return new ForwardingTSOFuture<>(request);
- }
-
- /**
* @see TSOProtocol#close()
*/
@Override
@@ -299,14 +243,6 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
return epoch;
}
- /**
- * Used for family deletion
- * @return the conflict detection level.
- */
- public ConflictDetectionLevel getConflictDetectionLevel() {
- return conflictDetectionLevel;
- }
-
// ----------------------------------------------------------------------------------------------------------------
// NodeCacheListener interface
// ----------------------------------------------------------------------------------------------------------------
@@ -410,19 +346,6 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
}
}
- private static class FenceRequestTimeoutEvent implements StateMachine.Event {
-
- final long tableID;
-
- FenceRequestTimeoutEvent(long tableID) {
- this.tableID = tableID;
- }
-
- public long getTableID() {
- return tableID;
- }
- }
-
private static class RequestEvent extends UserEvent<Long> {
TSOProto.Request req;
@@ -692,7 +615,6 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
final Queue<RequestAndTimeout> timestampRequests;
final Map<Long, RequestAndTimeout> commitRequests;
- final Map<Long, RequestAndTimeout> fenceRequests;
final Channel channel;
final HashedWheelTimer timeoutExecutor;
@@ -704,7 +626,6 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
this.timeoutExecutor = timeoutExecutor;
timestampRequests = new ArrayDeque<>();
commitRequests = new HashMap<>();
- fenceRequests = new HashMap<>();
}
private Timeout newTimeout(final StateMachine.Event timeoutEvent) {
@@ -729,10 +650,6 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
TSOProto.CommitRequest commitReq = req.getCommitRequest();
commitRequests.put(commitReq.getStartTimestamp(), new RequestAndTimeout(
request, newTimeout(new CommitRequestTimeoutEvent(commitReq.getStartTimestamp()))));
- } else if (req.hasFenceRequest()) {
- TSOProto.FenceRequest fenceReq = req.getFenceRequest();
- fenceRequests.put(fenceReq.getTableId(), new RequestAndTimeout(
- request, newTimeout(new FenceRequestTimeoutEvent(fenceReq.getTableId()))));
} else {
request.error(new IllegalArgumentException("Unknown request type"));
return;
@@ -776,18 +693,6 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
} else {
e.getRequest().success(resp.getCommitResponse().getCommitTimestamp());
}
- } else if (resp.hasFenceResponse()) {
- long tableID = resp.getFenceResponse().getTableId();
- RequestAndTimeout e = fenceRequests.remove(tableID);
- if (e == null) {
- LOG.debug("Received fence response for request that doesn't exist. Table ID: {}", tableID);
- return;
- }
- if (e.getTimeout() != null) {
- e.getTimeout().cancel();
- }
-
- e.getRequest().success(resp.getFenceResponse().getFenceId());
}
}
@@ -814,18 +719,6 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
return this;
}
- public StateMachine.State handleEvent(FenceRequestTimeoutEvent e) {
- long tableID = e.getTableID();
- if (fenceRequests.containsKey(tableID)) {
- RequestAndTimeout r = fenceRequests.remove(tableID);
- if (r.getTimeout() != null) {
- r.getTimeout().cancel();
- }
- queueRetryOrError(fsm, r.getRequest());
- }
- return this;
- }
-
public StateMachine.State handleEvent(CloseEvent e) {
LOG.debug("CONNECTED STATE: CloseEvent");
timeoutExecutor.stop();
@@ -869,15 +762,6 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
queueRetryOrError(fsm, r.getRequest());
iter.remove();
}
- iter = fenceRequests.entrySet().iterator();
- while (iter.hasNext()) {
- RequestAndTimeout r = iter.next().getValue();
- if (r.getTimeout() != null) {
- r.getTimeout().cancel();
- }
- queueRetryOrError(fsm, r.getRequest());
- iter.remove();
- }
channel.close();
}
@@ -916,12 +800,6 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
}
r.getRequest().error(new ClosingException());
}
- for (RequestAndTimeout r : fenceRequests.values()) {
- if (r.getTimeout() != null) {
- r.getTimeout().cancel();
- }
- r.getRequest().error(new ClosingException());
- }
}
}
@@ -942,11 +820,6 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
return this;
}
- public StateMachine.State handleEvent(FenceRequestTimeoutEvent e) {
- // Ignored. They will be retried or errored
- return this;
- }
-
public StateMachine.State handleEvent(ErrorEvent e) {
// Ignored. They will be retried or errored
return this;
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java
index 5ad6326..198913a 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java
@@ -17,11 +17,8 @@
*/
package org.apache.omid.tso.client;
-import java.util.List;
import java.util.Set;
-import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
-
/**
* Defines the protocol used on the client side to abstract communication to the TSO server
*/
@@ -51,17 +48,6 @@ public interface TSOProtocol {
TSOFuture<Long> commit(long transactionId, Set<? extends CellId> writeSet);
/**
- * Returns a new fence timestamp assigned by on the server-side
- * @param tableId
- * the table to create fence for.
- * @return the newly assigned timestamp as a future. If an error was detected, the future will contain a
- * corresponding protocol exception
- * see org.apache.omid.tso.TimestampOracle
- * see org.apache.omid.tso.TSOServer
- */
- TSOFuture<Long> getFence(long tableId);
-
- /**
* Closes the communication with the TSO server
* @return nothing. If an error was detected, the future will contain a corresponding protocol exception
*/
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/transaction-client/src/main/java/org/apache/omid/tso/util/DummyCellIdImpl.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/util/DummyCellIdImpl.java b/transaction-client/src/main/java/org/apache/omid/tso/util/DummyCellIdImpl.java
index ab3a385..4556757 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/util/DummyCellIdImpl.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/util/DummyCellIdImpl.java
@@ -22,15 +22,9 @@ import org.apache.omid.tso.client.CellId;
public class DummyCellIdImpl implements CellId {
private final long cellId;
- private final long rowId;
public DummyCellIdImpl(long cellId) {
- this(cellId, cellId);
- }
-
- public DummyCellIdImpl(long cellId, long rowId) {
this.cellId = cellId;
- this.rowId = rowId;
}
@Override
@@ -38,13 +32,4 @@ public class DummyCellIdImpl implements CellId {
return cellId;
}
- @Override
- public long getTableId() {
- return cellId;
- }
-
- @Override
- public long getRowId() {
- return rowId;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/transaction-client/src/main/resources/omid-client-config.yml
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/resources/omid-client-config.yml b/transaction-client/src/main/resources/omid-client-config.yml
index 478bd48..4263c35 100644
--- a/transaction-client/src/main/resources/omid-client-config.yml
+++ b/transaction-client/src/main/resources/omid-client-config.yml
@@ -36,8 +36,4 @@ executorThreads: 3
# Configure whether the TM performs the post-commit actions for a tx (update shadow cells and clean commit table entry)
# before returning to the control to the client (SYNC) or in parallel (ASYNC)
-postCommitMode: !!org.apache.omid.tso.client.OmidClientConfiguration$PostCommitMode SYNC
-
-# Conflict analysis level
-# Can either be cell level or row level. Default is cell level
-conflictDetectionLevel: !!org.apache.omid.tso.client.OmidClientConfiguration$ConflictDetectionLevel CELL
+postCommitMode: !!org.apache.omid.tso.client.OmidClientConfiguration$PostCommitMode SYNC
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/tso-server/pom.xml
----------------------------------------------------------------------
diff --git a/tso-server/pom.xml b/tso-server/pom.xml
index e1488c1..88ec145 100644
--- a/tso-server/pom.xml
+++ b/tso-server/pom.xml
@@ -58,6 +58,7 @@
<groupId>org.apache.omid</groupId>
<artifactId>omid-transaction-client</artifactId>
<version>${project.version}</version>
+ <scope>test</scope>
</dependency>
<!-- End of Dependencies on Omid modules -->
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/tso-server/src/main/java/org/apache/omid/tso/Batch.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/Batch.java b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
index 111c81c..99d0c5c 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/Batch.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/Batch.java
@@ -103,16 +103,6 @@ public class Batch {
}
- void addFence(long tableID, long fenceTimestamp, Channel c, MonitoringContext context) {
-
- Preconditions.checkState(!isFull(), "batch is full");
- int index = numEvents++;
- PersistEvent e = events[index];
- context.timerStart("persistence.processor.fence.latency");
- e.makePersistFence(tableID, fenceTimestamp, c, context);
-
- }
-
void addCommit(long startTimestamp, long commitTimestamp, Channel c, MonitoringContext context) {
Preconditions.checkState(!isFull(), "batch is full");
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
index b89cdc5..db58677 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistEvent.java
@@ -25,7 +25,7 @@ public final class PersistEvent {
private MonitoringContext monCtx;
enum Type {
- TIMESTAMP, COMMIT, ABORT, COMMIT_RETRY, FENCE
+ TIMESTAMP, COMMIT, ABORT, COMMIT_RETRY
}
private Type type = null;
@@ -71,16 +71,6 @@ public final class PersistEvent {
}
- void makePersistFence(long tableID, long fenceTimestamp, Channel c, MonitoringContext monCtx) {
-
- this.type = Type.FENCE;
- this.startTimestamp = tableID;
- this.commitTimestamp = fenceTimestamp;
- this.channel = c;
- this.monCtx = monCtx;
-
- }
-
MonitoringContext getMonCtx() {
return monCtx;
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
index ddebf13..b96945d 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessor.java
@@ -33,8 +33,6 @@ interface PersistenceProcessor extends Closeable {
void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
- void addFenceToBatch(long tableID, long fenceTimestamp, Channel c, MonitoringContext monCtx) throws Exception;
-
void triggerCurrentBatchFlush() throws Exception;
Future<Void> persistLowWatermark(long lowWatermark);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
index a6d63c7..07241f0 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java
@@ -95,11 +95,6 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
case ABORT:
event.getMonCtx().timerStop("persistence.processor.abort.latency");
break;
- case FENCE:
- // Persist the fence by using the fence identifier as both the start and commit timestamp.
- writer.addCommittedTransaction(event.getCommitTimestamp(), event.getCommitTimestamp());
- commitEventsToFlush++;
- break;
default:
throw new IllegalStateException("Event not allowed in Persistent Processor Handler: " + event);
}
@@ -124,10 +119,6 @@ public class PersistenceProcessorHandler implements WorkHandler<PersistenceProce
case ABORT:
event.getMonCtx().timerStart("reply.processor.abort.latency");
break;
- case FENCE:
- event.getMonCtx().timerStop("persistence.processor.fence.latency");
- event.getMonCtx().timerStart("reply.processor.fence.latency");
- break;
default:
throw new IllegalStateException("Event not allowed in Persistent Processor Handler: " + event);
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index 628b73d..95d77ba 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -146,10 +146,10 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
}
@Override
- public void addAbortToBatch(long startTimestamp, Channel c, MonitoringContext monCtx)
+ public void addAbortToBatch(long startTimestamp, Channel c, MonitoringContext context)
throws Exception {
- currentBatch.addAbort(startTimestamp, c, monCtx);
+ currentBatch.addAbort(startTimestamp, c, context);
if (currentBatch.isFull()) {
triggerCurrentBatchFlush();
}
@@ -157,19 +157,9 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
}
@Override
- public void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
+ public void addTimestampToBatch(long startTimestamp, Channel c, MonitoringContext context) throws Exception {
- currentBatch.addTimestamp(startTimestamp, c, monCtx);
- if (currentBatch.isFull()) {
- triggerCurrentBatchFlush();
- }
-
- }
-
- @Override
- public void addFenceToBatch(long tableID, long fenceTimestamp, Channel c, MonitoringContext monCtx) throws Exception {
-
- currentBatch.addFence(tableID, fenceTimestamp, c, monCtx);
+ currentBatch.addTimestamp(startTimestamp, c, context);
if (currentBatch.isFull()) {
triggerCurrentBatchFlush();
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
index 7e836aa..f196c42 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessor.java
@@ -67,18 +67,5 @@ interface ReplyProcessor extends Closeable {
void sendTimestampResponse(long startTimestamp, Channel channel);
- /**
- * Allow to send a fence response back to the client.
- *
- * @param tableID
- * the table we are creating the fence for
- * @param fenceTimestamp
- * the fence timestamp to return
- * @param channel
- * the channel used to send the response back to the client
- */
-
- void sendFenceResponse(long tableID, long fenceTimestamp, Channel c);
-
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index 28fe3a0..8e50323 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -67,7 +67,6 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
private final Meter abortMeter;
private final Meter commitMeter;
private final Meter timestampMeter;
- private final Meter fenceMeter;
@Inject
ReplyProcessorImpl(@Named("ReplyStrategy") WaitStrategy strategy,
@@ -101,7 +100,6 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
this.abortMeter = metrics.meter(name("tso", "aborts"));
this.commitMeter = metrics.meter(name("tso", "commits"));
this.timestampMeter = metrics.meter(name("tso", "timestampAllocation"));
- this.fenceMeter = metrics.meter(name("tso", "fences"));
LOG.info("ReplyProcessor initialized");
@@ -130,11 +128,6 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
event.getMonCtx().timerStop("reply.processor.timestamp.latency");
timestampMeter.mark();
break;
- case FENCE:
- sendFenceResponse(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel());
- event.getMonCtx().timerStop("reply.processor.fence.latency");
- fenceMeter.mark();
- break;
case COMMIT_RETRY:
throw new IllegalStateException("COMMIT_RETRY events must be filtered before this step: " + event);
default:
@@ -225,18 +218,6 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
}
@Override
- public void sendFenceResponse(long tableID, long fenceTimestamp, Channel c) {
-
- TSOProto.Response.Builder builder = TSOProto.Response.newBuilder();
- TSOProto.FenceResponse.Builder fenceBuilder = TSOProto.FenceResponse.newBuilder();
- fenceBuilder.setTableId(tableID);
- fenceBuilder.setFenceId(fenceTimestamp);
- builder.setFenceResponse(fenceBuilder.build());
- c.write(builder.build());
-
- }
-
- @Override
public void close() {
LOG.info("Terminating Reply Processor...");
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/tso-server/src/main/java/org/apache/omid/tso/RequestProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessor.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessor.java
index 062329d..8ab6c9f 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessor.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessor.java
@@ -27,7 +27,6 @@ public interface RequestProcessor extends TSOStateManager.StateObserver, Closeab
void timestampRequest(Channel c, MonitoringContext monCtx);
- void commitRequest(long startTimestamp, Collection<Long> writeSet, Collection<Long> tableIdSet, boolean isRetry, Channel c, MonitoringContext monCtx);
+ void commitRequest(long startTimestamp, Collection<Long> writeSet, boolean isRetry, Channel c, MonitoringContext monCtx);
- void fenceRequest(long tableID, Channel c, MonitoringContext monCtx);
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
index 04458f1..65416bc 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
@@ -24,7 +24,6 @@ import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
import com.lmax.disruptor.TimeoutHandler;
import com.lmax.disruptor.dsl.Disruptor;
-
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.tso.TSOStateManager.TSOState;
import org.jboss.netty.channel.Channel;
@@ -32,14 +31,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
-
import java.io.IOException;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
@@ -60,7 +55,6 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
private final TimestampOracle timestampOracle;
private final CommitHashMap hashmap;
- private final Map<Long, Long> tableFences;
private final MetricsRegistry metrics;
private final PersistenceProcessor persistProc;
@@ -96,7 +90,6 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
this.persistProc = persistProc;
this.timestampOracle = timestampOracle;
this.hashmap = new CommitHashMap(config.getConflictMapSize());
- this.tableFences = new HashMap<Long, Long>();
LOG.info("RequestProcessor initialized");
@@ -123,9 +116,6 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
case COMMIT:
handleCommit(event);
break;
- case FENCE:
- handleFence(event);
- break;
default:
throw new IllegalStateException("Event not allowed in Request Processor: " + event);
}
@@ -157,24 +147,13 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
}
@Override
- public void commitRequest(long startTimestamp, Collection<Long> writeSet, Collection<Long> tableIdSet, boolean isRetry, Channel c,
+ public void commitRequest(long startTimestamp, Collection<Long> writeSet, boolean isRetry, Channel c,
MonitoringContext monCtx) {
monCtx.timerStart("request.processor.commit.latency");
long seq = requestRing.next();
RequestEvent e = requestRing.get(seq);
- RequestEvent.makeCommitRequest(e, startTimestamp, monCtx, writeSet, tableIdSet, isRetry, c);
- requestRing.publish(seq);
-
- }
-
- @Override
- public void fenceRequest(long tableID, Channel c, MonitoringContext monCtx) {
-
- monCtx.timerStart("request.processor.fence.latency");
- long seq = requestRing.next();
- RequestEvent e = requestRing.get(seq);
- RequestEvent.makeFenceRequest(e, tableID, c, monCtx);
+ RequestEvent.makeCommitRequest(e, startTimestamp, monCtx, writeSet, isRetry, c);
requestRing.publish(seq);
}
@@ -187,56 +166,38 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
}
- // Checks whether transaction transactionId started before a fence creation of a table transactionId modified.
- private boolean hasConflictsWithFences(long startTimestamp, Collection<Long> tableIdSet) {
- if (!tableFences.isEmpty()) {
- for (long tableId: tableIdSet) {
- Long fence = tableFences.get(tableId);
- if (fence != null && fence > startTimestamp) {
- return true;
- }
- if (fence != null && fence < lowWatermark) {
- tableFences.remove(tableId); // Garbage collect entries of old fences.
- }
- }
- }
-
- return false;
- }
-
- // Checks whether transactionId has a write-write conflict with a transaction committed after transactionId.
- private boolean hasConflictsWithCommittedTransactions(long startTimestamp, Iterable<Long> writeSet) {
- for (long cellId : writeSet) {
- long value = hashmap.getLatestWriteForCell(cellId);
- if (value != 0 && value >= startTimestamp) {
- return true;
- }
- }
-
- return false;
- }
-
private void handleCommit(RequestEvent event) throws Exception {
long startTimestamp = event.getStartTimestamp();
Iterable<Long> writeSet = event.writeSet();
- Collection<Long> tableIdSet = event.getTableIdSet();
boolean isCommitRetry = event.isCommitRetry();
Channel c = event.getChannel();
- boolean nonEmptyWriteSet = writeSet.iterator().hasNext();
+ boolean txCanCommit;
+
+ int numCellsInWriteset = 0;
+ // 0. check if it should abort
+ if (startTimestamp <= lowWatermark) {
+ txCanCommit = false;
+ } else {
+ // 1. check the write-write conflicts
+ txCanCommit = true;
+ for (long cellId : writeSet) {
+ long value = hashmap.getLatestWriteForCell(cellId);
+ if (value != 0 && value >= startTimestamp) {
+ txCanCommit = false;
+ break;
+ }
+ numCellsInWriteset++;
+ }
+ }
- // If the transaction started before the low watermark, or
- // it started before a fence and modified the table the fence created for, or
- // it has a write-write conflict with a transaction committed after it started
- // Then it should abort. Otherwise, it can commit.
- if (startTimestamp > lowWatermark &&
- !hasConflictsWithFences(startTimestamp, tableIdSet) &&
- !hasConflictsWithCommittedTransactions(startTimestamp, writeSet)) {
+ if (txCanCommit) {
+ // 2. commit
long commitTimestamp = timestampOracle.next();
- if (nonEmptyWriteSet) {
+ if (numCellsInWriteset > 0) {
long newLowWatermark = lowWatermark;
for (long r : writeSet) {
@@ -266,16 +227,6 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
}
- private void handleFence(RequestEvent event) throws Exception {
- long tableID = event.getTableId();
- Channel c = event.getChannel();
-
- long fenceTimestamp = timestampOracle.next();
-
- tableFences.put(tableID, fenceTimestamp);
- persistProc.addFenceToBatch(tableID, fenceTimestamp, c, event.getMonCtx());
- }
-
@Override
public void close() throws IOException {
@@ -298,7 +249,7 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
final static class RequestEvent implements Iterable<Long> {
enum Type {
- TIMESTAMP, COMMIT, FENCE
+ TIMESTAMP, COMMIT
}
private Type type = null;
@@ -313,9 +264,6 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
private Long writeSet[] = new Long[MAX_INLINE];
private Collection<Long> writeSetAsCollection = null; // for the case where there's more than MAX_INLINE
- private Collection<Long> tableIdSet = null;
- private long tableID = 0;
-
static void makeTimestampRequest(RequestEvent e, Channel c, MonitoringContext monCtx) {
e.type = Type.TIMESTAMP;
e.channel = c;
@@ -326,7 +274,6 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
long startTimestamp,
MonitoringContext monCtx,
Collection<Long> writeSet,
- Collection<Long> TableIdSet,
boolean isRetry,
Channel c) {
e.monCtx = monCtx;
@@ -343,20 +290,10 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
int i = 0;
for (Long cellId : writeSet) {
e.writeSet[i] = cellId;
- ++i;
+ i++;
}
}
- e.tableIdSet = TableIdSet;
- }
- static void makeFenceRequest(RequestEvent e,
- long tableID,
- Channel c,
- MonitoringContext monCtx) {
- e.type = Type.FENCE;
- e.channel = c;
- e.monCtx = monCtx;
- e.tableID = tableID;
}
MonitoringContext getMonCtx() {
@@ -375,14 +312,6 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
return channel;
}
- Collection<Long> getTableIdSet() {
- return tableIdSet;
- }
-
- long getTableId() {
- return tableID;
- }
-
@Override
public Iterator<Long> iterator() {
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java b/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
index a218a1d..fe99880 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOChannelHandler.java
@@ -170,13 +170,9 @@ public class TSOChannelHandler extends SimpleChannelHandler implements Closeable
TSOProto.CommitRequest cr = request.getCommitRequest();
requestProcessor.commitRequest(cr.getStartTimestamp(),
cr.getCellIdList(),
- cr.getTableIdList(),
cr.getIsRetry(),
ctx.getChannel(),
new MonitoringContext(metrics));
- } else if (request.hasFenceRequest()) {
- TSOProto.FenceRequest fr = request.getFenceRequest();
- requestProcessor.fenceRequest(fr.getTableId(), ctx.getChannel(), new MonitoringContext(metrics));
} else {
LOG.error("Invalid request {}. Closing channel {}", request, ctx.getChannel());
ctx.getChannel().close();
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java b/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
index 4d0d844..a7aec27 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
@@ -24,9 +24,6 @@ import com.google.inject.Provides;
import javax.inject.Named;
import javax.inject.Singleton;
-
-import org.apache.omid.tso.TSOServerConfig.TIMESTAMP_TYPE;
-
import java.net.SocketException;
import java.net.UnknownHostException;
@@ -46,13 +43,7 @@ class TSOModule extends AbstractModule {
bind(TSOChannelHandler.class).in(Singleton.class);
bind(TSOStateManager.class).to(TSOStateManagerImpl.class).in(Singleton.class);
-
- if (config.getTimestampTypeEnum() == TIMESTAMP_TYPE.WORLD_TIME) {
- bind(TimestampOracle.class).to(WorldClockOracleImpl.class).in(Singleton.class);
- } else {
- bind(TimestampOracle.class).to(TimestampOracleImpl.class).in(Singleton.class);
- }
-
+ bind(TimestampOracle.class).to(TimestampOracleImpl.class).in(Singleton.class);
bind(Panicker.class).to(SystemExitPanicker.class).in(Singleton.class);
install(new BatchPoolModule(config));
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java b/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
index 8f061a1..3292211 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
@@ -44,11 +44,6 @@ public class TSOServerConfig extends SecureHBaseConfig {
LOW_CPU
};
- public static enum TIMESTAMP_TYPE {
- INCREMENTAL,
- WORLD_TIME
- };
-
// ----------------------------------------------------------------------------------------------------------------
// Instantiation
// ----------------------------------------------------------------------------------------------------------------
@@ -87,8 +82,6 @@ public class TSOServerConfig extends SecureHBaseConfig {
private String networkIfaceName = NetworkUtils.getDefaultNetworkInterface();
- private String timestampType;
-
public int getPort() {
return port;
}
@@ -137,18 +130,6 @@ public class TSOServerConfig extends SecureHBaseConfig {
this.networkIfaceName = networkIfaceName;
}
- public String getTimestampType() {
- return timestampType;
- }
-
- public void setTimestampType(String type) {
- this.timestampType = type;
- }
-
- public TIMESTAMP_TYPE getTimestampTypeEnum() {
- return TSOServerConfig.TIMESTAMP_TYPE.valueOf(timestampType);
- }
-
public Module getTimestampStoreModule() {
return timestampStoreModule;
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java b/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
index 454526f..0a65c01 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TimestampOracleImpl.java
@@ -19,17 +19,14 @@ package org.apache.omid.tso;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
import org.apache.omid.metrics.Gauge;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.timestamp.storage.TimestampStorage;
-import org.apache.omid.transaction.AbstractTransactionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
-
import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
@@ -137,12 +134,9 @@ public class TimestampOracleImpl implements TimestampOracle {
@SuppressWarnings("StatementWithEmptyBody")
@Override
public long next() {
- lastTimestamp += AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
+ lastTimestamp++;
- if (lastTimestamp >= nextAllocationThreshold) {
- // set the nextAllocationThread to max value of long in order to
- // make sure only one call to this function will execute a thread to extend the timestamp batch.
- nextAllocationThreshold = Long.MAX_VALUE;
+ if (lastTimestamp == nextAllocationThreshold) {
executor.execute(allocateTimestampsBatchTask);
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/tso-server/src/main/java/org/apache/omid/tso/WorldClockOracleImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/WorldClockOracleImpl.java b/tso-server/src/main/java/org/apache/omid/tso/WorldClockOracleImpl.java
deleted file mode 100644
index 4a9c5b5..0000000
--- a/tso-server/src/main/java/org/apache/omid/tso/WorldClockOracleImpl.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.omid.tso;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.omid.metrics.Gauge;
-import org.apache.omid.metrics.MetricsRegistry;
-import org.apache.omid.timestamp.storage.TimestampStorage;
-import org.apache.omid.transaction.AbstractTransactionManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.inject.Inject;
-import javax.inject.Singleton;
-
-import java.io.IOException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.omid.metrics.MetricsUtils.name;
-
-/**
- * The Timestamp Oracle that gives monotonically increasing timestamps based on world time
- */
-@Singleton
-public class WorldClockOracleImpl implements TimestampOracle {
-
- private static final Logger LOG = LoggerFactory.getLogger(WorldClockOracleImpl.class);
-
- static final long MAX_TX_PER_MS = 1_000_000; // 1 million
- static final long TIMESTAMP_INTERVAL_MS = 10_000; // 10 seconds interval
- private static final long TIMESTAMP_ALLOCATION_INTERVAL_MS = 7_000; // 7 seconds
-
- private long lastTimestamp;
- private long maxTimestamp;
-
- private TimestampStorage storage;
- private Panicker panicker;
-
- private volatile long maxAllocatedTime;
-
- private final ScheduledExecutorService scheduler =
- Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("ts-persist-%d").build());
-
- private Runnable allocateTimestampsBatchTask;
-
- private class AllocateTimestampBatchTask implements Runnable {
- long previousMaxTime;
-
- AllocateTimestampBatchTask(long previousMaxTime) {
- this.previousMaxTime = previousMaxTime;
- }
-
- @Override
- public void run() {
- long newMaxTime = (System.currentTimeMillis() + TIMESTAMP_INTERVAL_MS) * MAX_TX_PER_MS;
- try {
- storage.updateMaxTimestamp(previousMaxTime, newMaxTime);
- maxAllocatedTime = newMaxTime;
- previousMaxTime = newMaxTime;
- } catch (Throwable e) {
- panicker.panic("Can't store the new max timestamp", e);
- }
- }
- }
-
- @Inject
- public WorldClockOracleImpl(MetricsRegistry metrics,
- TimestampStorage tsStorage,
- Panicker panicker) throws IOException {
-
- this.storage = tsStorage;
- this.panicker = panicker;
-
- metrics.gauge(name("tso", "maxTimestamp"), new Gauge<Long>() {
- @Override
- public Long getValue() {
- return maxTimestamp;
- }
- });
-
- }
-
- @Override
- public void initialize() throws IOException {
-
- this.lastTimestamp = this.maxTimestamp = storage.getMaxTimestamp();
-
- this.allocateTimestampsBatchTask = new AllocateTimestampBatchTask(lastTimestamp);
-
- // Trigger first allocation of timestamps
- scheduler.schedule(allocateTimestampsBatchTask, 0, TimeUnit.MILLISECONDS);
-
- // Waiting for the current epoch to start. Occurs in case of failover when the previous TSO allocated the current time frame.
- while ((System.currentTimeMillis() * MAX_TX_PER_MS) < this.lastTimestamp) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- continue;
- }
- }
-
- // Launch the periodic timestamp interval allocation. In this case, the timestamp interval is extended even though the TSO is idle.
- // Because we are world time based, this guarantees that the first request after a long time does not need to wait for new interval allocation.
- scheduler.scheduleAtFixedRate(allocateTimestampsBatchTask, TIMESTAMP_ALLOCATION_INTERVAL_MS, TIMESTAMP_ALLOCATION_INTERVAL_MS, TimeUnit.MILLISECONDS);
- }
-
- /**
- * Returns the next timestamp if available. Otherwise spins till the ts-persist thread allocates a new timestamp.
- */
- @Override
- public long next() {
-
- long currentMsFirstTimestamp = System.currentTimeMillis() * MAX_TX_PER_MS;
-
- lastTimestamp += AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
-
- // Return the next timestamp in case we are still in the same millisecond as the previous timestamp was.
- if (lastTimestamp >= currentMsFirstTimestamp) {
- return lastTimestamp;
- }
-
- if (currentMsFirstTimestamp >= maxTimestamp) { // Intentional race to reduce synchronization overhead in every access to maxTimestamp
- while (maxAllocatedTime <= currentMsFirstTimestamp) { // Waiting for the interval allocation
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- continue;
- }
- }
- assert (maxAllocatedTime > maxTimestamp);
- maxTimestamp = maxAllocatedTime;
- }
-
- lastTimestamp = currentMsFirstTimestamp;
-
- return lastTimestamp;
- }
-
- @Override
- public long getLast() {
- return lastTimestamp;
- }
-
- @Override
- public String toString() {
- return String.format("TimestampOracle -> LastTimestamp: %d, MaxTimestamp: %d", lastTimestamp, maxTimestamp);
- }
-
- @VisibleForTesting
- static class InMemoryTimestampStorage implements TimestampStorage {
-
- long maxTime = 0;
-
- @Override
- public void updateMaxTimestamp(long previousMaxTime, long nextMaxTime) {
- maxTime = nextMaxTime;
- LOG.info("Updating max timestamp: (previous:{}, new:{})", previousMaxTime, nextMaxTime);
- }
-
- @Override
- public long getMaxTimestamp() {
- return maxTime;
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/tso-server/src/main/resources/default-omid-server-configuration.yml
----------------------------------------------------------------------
diff --git a/tso-server/src/main/resources/default-omid-server-configuration.yml b/tso-server/src/main/resources/default-omid-server-configuration.yml
index 4e45122..017af4f 100644
--- a/tso-server/src/main/resources/default-omid-server-configuration.yml
+++ b/tso-server/src/main/resources/default-omid-server-configuration.yml
@@ -26,10 +26,7 @@ numConcurrentCTWriters: 2
batchSizePerCTWriter: 25
# When this timeout expires, the contents of the batch are flushed to the datastore
batchPersistTimeoutInMs: 10
-# Timestamp generation strategy
-# INCREMENTAL - [Default] regular counter
-# WORLD_TIME - world time based counter
-timestampType: INCREMENTAL
+
# Default module configuration (No TSO High Availability & in-memory storage for timestamp and commit tables)
timestampStoreModule: !!org.apache.omid.tso.InMemoryTimestampStorageModule [ ]
commitTableStoreModule: !!org.apache.omid.tso.InMemoryCommitTableStorageModule [ ]
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
index 1c44d05..405102a 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRequestProcessor.java
@@ -19,10 +19,8 @@ package org.apache.omid.tso;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
-
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.metrics.NullMetricsProvider;
-import org.apache.omid.transaction.AbstractTransactionManager;
import org.jboss.netty.channel.Channel;
import org.mockito.ArgumentCaptor;
import org.slf4j.Logger;
@@ -30,7 +28,6 @@ import org.slf4j.LoggerFactory;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -65,9 +62,6 @@ public class TestRequestProcessor {
// Build the required scaffolding for the test
MetricsRegistry metrics = new NullMetricsProvider();
- TSOServerConfig config = new TSOServerConfig();
- config.setConflictMapSize(CONFLICT_MAP_SIZE);
-
TimestampOracleImpl timestampOracle =
new TimestampOracleImpl(metrics, new TimestampOracleImpl.InMemoryTimestampStorage(), new MockPanicker());
@@ -78,6 +72,9 @@ public class TestRequestProcessor {
f.set(null);
doReturn(f).when(persist).persistLowWatermark(any(Long.class));
+ TSOServerConfig config = new TSOServerConfig();
+ config.setConflictMapSize(CONFLICT_MAP_SIZE);
+
requestProc = new RequestProcessorImpl(metrics, timestampOracle, persist, new MockPanicker(), config);
// Initialize the state for the experiment
@@ -98,8 +95,7 @@ public class TestRequestProcessor {
// verify that timestamps increase monotonically
for (int i = 0; i < 100; i++) {
requestProc.timestampRequest(null, new MonitoringContext(metrics));
- verify(persist, timeout(100).times(1)).addTimestampToBatch(eq(firstTS), any(Channel.class), any(MonitoringContext.class));
- firstTS += AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
+ verify(persist, timeout(100).times(1)).addTimestampToBatch(eq(firstTS++), any(Channel.class), any(MonitoringContext.class));
}
}
@@ -114,10 +110,10 @@ public class TestRequestProcessor {
long firstTS = TScapture.getValue();
List<Long> writeSet = Lists.newArrayList(1L, 20L, 203L);
- requestProc.commitRequest(firstTS - AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
- verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN), any(Channel.class), any(MonitoringContext.class));
+ requestProc.commitRequest(firstTS - 1, writeSet, false, null, new MonitoringContext(metrics));
+ verify(persist, timeout(100).times(1)).addAbortToBatch(eq(firstTS - 1), any(Channel.class), any(MonitoringContext.class));
- requestProc.commitRequest(firstTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
+ requestProc.commitRequest(firstTS, writeSet, false, null, new MonitoringContext(metrics));
ArgumentCaptor<Long> commitTScapture = ArgumentCaptor.forClass(Long.class);
verify(persist, timeout(100).times(1)).addCommitToBatch(eq(firstTS), commitTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
@@ -136,24 +132,14 @@ public class TestRequestProcessor {
TScapture.capture(), any(Channel.class), any(MonitoringContext.class));
long thirdTS = TScapture.getValue();
- requestProc.commitRequest(thirdTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
+ requestProc.commitRequest(thirdTS, writeSet, false, null, new MonitoringContext(metrics));
verify(persist, timeout(100).times(1)).addCommitToBatch(eq(thirdTS), anyLong(), any(Channel.class), any(MonitoringContext.class));
- requestProc.commitRequest(secondTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
+ requestProc.commitRequest(secondTS, writeSet, false, null, new MonitoringContext(metrics));
verify(persist, timeout(100).times(1)).addAbortToBatch(eq(secondTS), any(Channel.class), any(MonitoringContext.class));
}
@Test(timeOut = 30_000)
- public void testFence() throws Exception {
-
- requestProc.fenceRequest(666L, null, new MonitoringContext(metrics));
- ArgumentCaptor<Long> firstTScapture = ArgumentCaptor.forClass(Long.class);
- verify(persist, timeout(100).times(1)).addFenceToBatch(eq(666L),
- firstTScapture.capture(), any(Channel.class), any(MonitoringContext.class));
-
- }
-
- @Test(timeOut = 30_000)
public void testCommitRequestAbortsWhenResettingRequestProcessorState() throws Exception {
List<Long> writeSet = Collections.emptyList();
@@ -171,7 +157,7 @@ public class TestRequestProcessor {
stateManager.initialize();
// ...check that the transaction is aborted when trying to commit
- requestProc.commitRequest(startTS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
+ requestProc.commitRequest(startTS, writeSet, false, null, new MonitoringContext(metrics));
verify(persist, timeout(100).times(1)).addAbortToBatch(eq(startTS), any(Channel.class), any(MonitoringContext.class));
}
@@ -180,21 +166,21 @@ public class TestRequestProcessor {
public void testLowWatermarkIsStoredOnlyWhenACacheElementIsEvicted() throws Exception {
final int ANY_START_TS = 1;
- final long FIRST_COMMIT_TS_EVICTED = AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
- final long NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED = FIRST_COMMIT_TS_EVICTED + AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN;
+ final long FIRST_COMMIT_TS_EVICTED = 1L;
+ final long NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED = 2L;
// Fill the cache to provoke a cache eviction
for (long i = 0; i < CONFLICT_MAP_SIZE + CONFLICT_MAP_ASSOCIATIVITY; i++) {
long writeSetElementHash = i + 1; // This is to match the assigned CT: K/V in cache = WS Element Hash/CT
List<Long> writeSet = Lists.newArrayList(writeSetElementHash);
- requestProc.commitRequest(ANY_START_TS, writeSet, new ArrayList<Long>(0), false, null, new MonitoringContext(metrics));
+ requestProc.commitRequest(ANY_START_TS, writeSet, false, null, new MonitoringContext(metrics));
}
Thread.currentThread().sleep(3000); // Allow the Request processor to finish the request processing
// Check that first time its called is on init
verify(persist, timeout(100).times(1)).persistLowWatermark(eq(0L));
- // Then, check it is called when cache is full and the first element is evicted (should be a AbstractTransactionManager.NUM_OF_CHECKPOINTS)
+ // Then, check it is called when cache is full and the first element is evicted (should be a 1)
verify(persist, timeout(100).times(1)).persistLowWatermark(eq(FIRST_COMMIT_TS_EVICTED));
// Finally it should never be called with the next element
verify(persist, timeout(100).never()).persistLowWatermark(eq(NEXT_COMMIT_TS_THAT_SHOULD_BE_EVICTED));
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
index 157bb48..968f4a9 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestTSOChannelHandlerNetty.java
@@ -248,8 +248,6 @@ public class TestTSOChannelHandlerNetty {
testWritingTimestampRequest(channel);
testWritingCommitRequest(channel);
-
- testWritingFenceRequest(channel);
}
private void testWritingTimestampRequest(Channel channel) throws InterruptedException {
@@ -262,7 +260,7 @@ public class TestTSOChannelHandlerNetty {
channel.write(tsBuilder.build()).await();
verify(requestProcessor, timeout(100).times(1)).timestampRequest(any(Channel.class), any(MonitoringContext.class));
verify(requestProcessor, timeout(100).never())
- .commitRequest(anyLong(), anyCollectionOf(Long.class), anyCollectionOf(Long.class), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
+ .commitRequest(anyLong(), anyCollectionOf(Long.class), anyBoolean(), any(Channel.class), any(MonitoringContext.class));
}
private void testWritingCommitRequest(Channel channel) throws InterruptedException {
@@ -279,23 +277,7 @@ public class TestTSOChannelHandlerNetty {
channel.write(commitBuilder.build()).await();
verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContext.class));
verify(requestProcessor, timeout(100).times(1))
- .commitRequest(eq(666L), anyCollectionOf(Long.class), anyCollectionOf(Long.class), eq(false), any(Channel.class), any(MonitoringContext.class));
- }
-
- private void testWritingFenceRequest(Channel channel) throws InterruptedException {
- // Reset mock
- reset(requestProcessor);
- TSOProto.Request.Builder fenceBuilder = TSOProto.Request.newBuilder();
- TSOProto.FenceRequest.Builder fenceRequestBuilder = TSOProto.FenceRequest.newBuilder();
- fenceRequestBuilder.setTableId(666);
- fenceBuilder.setFenceRequest(fenceRequestBuilder.build());
- TSOProto.Request r = fenceBuilder.build();
- assertTrue(r.hasFenceRequest());
- // Write into the channel
- channel.write(fenceBuilder.build()).await();
- verify(requestProcessor, timeout(100).never()).timestampRequest(any(Channel.class), any(MonitoringContext.class));
- verify(requestProcessor, timeout(100).times(1))
- .fenceRequest(eq(666L), any(Channel.class), any(MonitoringContext.class));
+ .commitRequest(eq(666L), anyCollectionOf(Long.class), eq(false), any(Channel.class), any(MonitoringContext.class));
}
// ----------------------------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java b/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
index a5f236c..c75e95b 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestTimestampOracle.java
@@ -19,7 +19,6 @@ package org.apache.omid.tso;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.timestamp.storage.TimestampStorage;
-import org.apache.omid.transaction.AbstractTransactionManager;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@@ -52,8 +51,6 @@ public class TestTimestampOracle {
private Panicker panicker;
@Mock
private TimestampStorage timestampStorage;
- @Mock
- TSOServerConfig config;
// Component under test
@InjectMocks
@@ -73,7 +70,7 @@ public class TestTimestampOracle {
long last = timestampOracle.next();
for (int i = 0; i < (3 * TimestampOracleImpl.TIMESTAMP_BATCH); i++) {
long current = timestampOracle.next();
- assertEquals(current, last + AbstractTransactionManager.MAX_CHECKPOINTS_PER_TXN, "Not monotonic growth");
+ assertEquals(current, last + 1, "Not monotonic growth");
last = current;
}
assertTrue(timestampOracle.getLast() == last);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/31dd269d/tso-server/src/test/java/org/apache/omid/tso/TestWorldTimeOracle.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestWorldTimeOracle.java b/tso-server/src/test/java/org/apache/omid/tso/TestWorldTimeOracle.java
deleted file mode 100644
index df59530..0000000
--- a/tso-server/src/test/java/org/apache/omid/tso/TestWorldTimeOracle.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.omid.tso;
-
-import org.apache.omid.metrics.MetricsRegistry;
-import org.apache.omid.timestamp.storage.TimestampStorage;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.io.IOException;
-import java.util.concurrent.CountDownLatch;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.atLeastOnce;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-public class TestWorldTimeOracle {
-
- private static final Logger LOG = LoggerFactory.getLogger(TestWorldTimeOracle.class);
-
- @Mock
- private MetricsRegistry metrics;
- @Mock
- private Panicker panicker;
- @Mock
- private TimestampStorage timestampStorage;
- @Mock
- private TSOServerConfig config;
-
- // Component under test
- @InjectMocks
- private WorldClockOracleImpl worldClockOracle;
-
- @BeforeMethod(alwaysRun = true, timeOut = 30_000)
- public void initMocksAndComponents() {
- MockitoAnnotations.initMocks(this);
- }
-
- @Test(timeOut = 30_000)
- public void testMonotonicTimestampGrowth() throws Exception {
-
- // Intialize component under test
- worldClockOracle.initialize();
-
- long last = worldClockOracle.next();
-
- int timestampIntervalSec = (int) (WorldClockOracleImpl.TIMESTAMP_INTERVAL_MS / 1000) * 2;
- for (int i = 0; i < timestampIntervalSec; i++) {
- long current = worldClockOracle.next();
- assertTrue(current > last+1 , "Timestamp should be based on world time");
- last = current;
- Thread.sleep(1000);
- }
-
- assertTrue(worldClockOracle.getLast() == last);
- LOG.info("Last timestamp: {}", last);
- }
-
- @Test(timeOut = 10_000)
- public void testTimestampOraclePanicsWhenTheStorageHasProblems() throws Exception {
-
- // Intialize component under test
- worldClockOracle.initialize();
-
- // Cause an exception when updating the max timestamp
- final CountDownLatch updateMaxTimestampMethodCalled = new CountDownLatch(1);
- doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
- updateMaxTimestampMethodCalled.countDown();
- throw new RuntimeException("Out of memory or something");
- }
- }).when(timestampStorage).updateMaxTimestamp(anyLong(), anyLong());
-
- // Make the previous exception to be thrown
- Thread allocThread = new Thread("AllocThread") {
- @Override
- public void run() {
- while (true) {
- worldClockOracle.next();
- }
- }
- };
- allocThread.start();
-
- updateMaxTimestampMethodCalled.await();
-
- // Verify that it has blown up
- verify(panicker, atLeastOnce()).panic(anyString(), any(Throwable.class));
- }
-
-}