You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by ik...@apache.org on 2016/04/21 02:02:27 UTC
[10/52] [abbrv] incubator-omid git commit: Rename tsoclient package
to tso.client
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/transaction-client/src/main/java/com/yahoo/omid/tso/client/TSOFuture.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/com/yahoo/omid/tso/client/TSOFuture.java b/transaction-client/src/main/java/com/yahoo/omid/tso/client/TSOFuture.java
new file mode 100644
index 0000000..8f42353
--- /dev/null
+++ b/transaction-client/src/main/java/com/yahoo/omid/tso/client/TSOFuture.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.omid.tso.client;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+
+public interface TSOFuture<T> extends Future<T> {
+ public void addListener(Runnable listener, Executor executor);
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/transaction-client/src/main/java/com/yahoo/omid/tso/client/TSOProtocol.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/com/yahoo/omid/tso/client/TSOProtocol.java b/transaction-client/src/main/java/com/yahoo/omid/tso/client/TSOProtocol.java
new file mode 100644
index 0000000..cb9e679
--- /dev/null
+++ b/transaction-client/src/main/java/com/yahoo/omid/tso/client/TSOProtocol.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.omid.tso.client;
+
+import java.util.Set;
+
+/**
+ * Defines the protocol used on the client side to abstract communication to the TSO server
+ */
+public interface TSOProtocol {
+
+ /**
+ * Returns a new timestamp assigned by on the server-side
+ * @return the newly assigned timestamp as a future. If an error was detected, the future will contain a
+ * corresponding protocol exception
+ * @see TimestampOracle
+ * @see TSOServer
+ */
+ TSOFuture<Long> getNewStartTimestamp();
+
+ /**
+ * Returns the result of the conflict detection made on the server-side for the specified transaction
+ * @param transactionId
+ * the transaction to check for conflicts
+ * @param writeSet
+ * the writeSet of the transaction, which includes all the modified cells
+ * @return the commit timestamp as a future if the transaction was committed. If the transaction was aborted due
+ * to conflicts with a concurrent transaction, the future will include an AbortException. If an error was detected,
+ * the future will contain a corresponding protocol exception
+ * @see TimestampOracle
+ * @see TSOServer
+ */
+ TSOFuture<Long> commit(long transactionId, Set<? extends CellId> writeSet);
+
+ /**
+ * Closes the communication with the TSO server
+ * @return nothing. If an error was detected, the future will contain a corresponding protocol exception
+ */
+ TSOFuture<Void> close();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/transaction-client/src/main/java/com/yahoo/omid/tso/util/DummyCellIdImpl.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/com/yahoo/omid/tso/util/DummyCellIdImpl.java b/transaction-client/src/main/java/com/yahoo/omid/tso/util/DummyCellIdImpl.java
index 600dc51..d4421e0 100644
--- a/transaction-client/src/main/java/com/yahoo/omid/tso/util/DummyCellIdImpl.java
+++ b/transaction-client/src/main/java/com/yahoo/omid/tso/util/DummyCellIdImpl.java
@@ -17,7 +17,7 @@
*/
package com.yahoo.omid.tso.util;
-import com.yahoo.omid.tsoclient.CellId;
+import com.yahoo.omid.tso.client.CellId;
public class DummyCellIdImpl implements CellId {
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/transaction-client/src/main/java/com/yahoo/omid/tsoclient/AbortException.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/AbortException.java b/transaction-client/src/main/java/com/yahoo/omid/tsoclient/AbortException.java
deleted file mode 100644
index 5a4d43c..0000000
--- a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/AbortException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tsoclient;
-
-/**
- * Thrown when the TSO server has aborted a transaction
- */
-public class AbortException extends Exception {
-
- private static final long serialVersionUID = 1861474360100681040L;
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/transaction-client/src/main/java/com/yahoo/omid/tsoclient/CellId.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/CellId.java b/transaction-client/src/main/java/com/yahoo/omid/tsoclient/CellId.java
deleted file mode 100644
index eab757e..0000000
--- a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/CellId.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tsoclient;
-
-public interface CellId {
-
- public long getCellId();
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/transaction-client/src/main/java/com/yahoo/omid/tsoclient/ClosingException.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/ClosingException.java b/transaction-client/src/main/java/com/yahoo/omid/tsoclient/ClosingException.java
deleted file mode 100644
index d257deb..0000000
--- a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/ClosingException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tsoclient;
-
-/**
- * Thrown when an error is produced when performing the actions required
- * to close the communication with the TSO server
- */
-public class ClosingException extends Exception {
-
- private static final long serialVersionUID = -5681694952053689884L;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/transaction-client/src/main/java/com/yahoo/omid/tsoclient/ConnectionException.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/ConnectionException.java b/transaction-client/src/main/java/com/yahoo/omid/tsoclient/ConnectionException.java
deleted file mode 100644
index e20dd06..0000000
--- a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/ConnectionException.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tsoclient;
-
-import java.io.IOException;
-
-/**
- * Thrown when there are problems with the comm channel with the TSO server
- * (e.g. when it is closed or disconnected)
- */
-public class ConnectionException extends IOException {
-
- private static final long serialVersionUID = -8489539195700267443L;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/transaction-client/src/main/java/com/yahoo/omid/tsoclient/ForwardingTSOFuture.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/ForwardingTSOFuture.java b/transaction-client/src/main/java/com/yahoo/omid/tsoclient/ForwardingTSOFuture.java
deleted file mode 100644
index a2b889e..0000000
--- a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/ForwardingTSOFuture.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tsoclient;
-
-import com.google.common.util.concurrent.ListenableFuture;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-public class ForwardingTSOFuture<T> implements TSOFuture<T> {
- private final ListenableFuture<T> future;
-
- public ForwardingTSOFuture(ListenableFuture<T> future) {
- this.future = future;
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- return future.cancel(mayInterruptIfRunning);
- }
-
- @Override
- public boolean isCancelled() {
- return future.isCancelled();
- }
-
- @Override
- public boolean isDone() {
- return future.isDone();
- }
-
- @Override
- public T get() throws InterruptedException, ExecutionException {
- return future.get();
- }
-
- @Override
- public T get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- return future.get(timeout, unit);
- }
-
- @Override
- public void addListener(Runnable listener, Executor executor) {
- future.addListener(listener, executor);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/transaction-client/src/main/java/com/yahoo/omid/tsoclient/HandshakeFailedException.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/HandshakeFailedException.java b/transaction-client/src/main/java/com/yahoo/omid/tsoclient/HandshakeFailedException.java
deleted file mode 100644
index dc28656..0000000
--- a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/HandshakeFailedException.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tsoclient;
-
-/**
- * Thrown when some incompatibilities between the TSO client & server are
- * found
- */
-public class HandshakeFailedException extends Exception {
-
- private static final long serialVersionUID = 8545505066920548834L;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/transaction-client/src/main/java/com/yahoo/omid/tsoclient/MockTSOClient.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/MockTSOClient.java b/transaction-client/src/main/java/com/yahoo/omid/tsoclient/MockTSOClient.java
deleted file mode 100644
index a3e5653..0000000
--- a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/MockTSOClient.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tsoclient;
-
-import com.google.common.util.concurrent.SettableFuture;
-import com.yahoo.omid.committable.CommitTable;
-
-import java.io.IOException;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class MockTSOClient implements TSOProtocol {
- private final AtomicLong timestampGenerator = new AtomicLong();
- private final int CONFLICT_MAP_SIZE = 1 * 1000 * 1000;
- private final long[] conflictMap = new long[CONFLICT_MAP_SIZE];
- private final AtomicLong lwm = new AtomicLong();
-
- private final CommitTable.Writer commitTable;
-
- public MockTSOClient(CommitTable.Writer commitTable) {
- this.commitTable = commitTable;
- }
-
- @Override
- public TSOFuture<Long> getNewStartTimestamp() {
- synchronized (conflictMap) {
- SettableFuture<Long> f = SettableFuture.<Long>create();
- f.set(timestampGenerator.incrementAndGet());
- return new ForwardingTSOFuture<Long>(f);
- }
- }
-
- @Override
- public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells) {
- synchronized (conflictMap) {
- SettableFuture<Long> f = SettableFuture.<Long>create();
- if (transactionId < lwm.get()) {
- f.setException(new AbortException());
- return new ForwardingTSOFuture<Long>(f);
- }
-
- boolean canCommit = true;
- for (CellId c : cells) {
- int index = Math.abs((int) (c.getCellId() % CONFLICT_MAP_SIZE));
- if (conflictMap[index] >= transactionId) {
- canCommit = false;
- break;
- }
- }
-
- if (canCommit) {
- long commitTimestamp = timestampGenerator.incrementAndGet();
- for (CellId c : cells) {
- int index = Math.abs((int) (c.getCellId() % CONFLICT_MAP_SIZE));
- long oldVal = conflictMap[index];
- conflictMap[index] = commitTimestamp;
- long curLwm = lwm.get();
- while (oldVal > curLwm) {
- if (lwm.compareAndSet(curLwm, oldVal)) {
- break;
- }
- curLwm = lwm.get();
- }
- }
-
- f.set(commitTimestamp);
- try {
- commitTable.addCommittedTransaction(transactionId, commitTimestamp);
- commitTable.updateLowWatermark(lwm.get());
- commitTable.flush();
- } catch (IOException ioe) {
- f.setException(ioe);
- }
- } else {
- f.setException(new AbortException());
- }
- return new ForwardingTSOFuture<Long>(f);
- }
- }
-
- @Override
- public TSOFuture<Void> close() {
- SettableFuture<Void> f = SettableFuture.<Void>create();
- f.set(null);
- return new ForwardingTSOFuture<Void>(f);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/transaction-client/src/main/java/com/yahoo/omid/tsoclient/NewTSOException.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/NewTSOException.java b/transaction-client/src/main/java/com/yahoo/omid/tsoclient/NewTSOException.java
deleted file mode 100644
index 735f51c..0000000
--- a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/NewTSOException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tsoclient;
-
-/**
- * Thrown when a new TSO has been detected
- */
-public class NewTSOException extends Exception {
-
- private static final long serialVersionUID = -3250655858200759321L;
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/transaction-client/src/main/java/com/yahoo/omid/tsoclient/OmidClientConfiguration.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/OmidClientConfiguration.java b/transaction-client/src/main/java/com/yahoo/omid/tsoclient/OmidClientConfiguration.java
deleted file mode 100644
index 566964a..0000000
--- a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/OmidClientConfiguration.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tsoclient;
-
-import com.google.inject.Inject;
-import com.google.inject.name.Named;
-import com.yahoo.omid.YAMLUtils;
-
-/**
- * Configuration for Omid client side
- */
-public class OmidClientConfiguration {
-
- private static final String DEFAULT_CONFIG_FILE_NAME = "omid-client-config.yml";
-
- public enum ConnType {DIRECT, HA}
-
- public enum PostCommitMode {SYNC, ASYNC}
-
- // Basic connection related params
-
- private ConnType connectionType = ConnType.DIRECT;
- private String connectionString;
- private String zkCurrentTsoPath;
- private String zkNamespace;
- private int zkConnectionTimeoutInSecs;
-
- // Communication protocol related params
-
- private int requestMaxRetries;
- private int requestTimeoutInMs;
- private int reconnectionDelayInSecs;
- private int retryDelayInMs;
- private int executorThreads;
-
- // Transaction Manager related params
-
- private PostCommitMode postCommitMode = PostCommitMode.SYNC;
-
- // ----------------------------------------------------------------------------------------------------------------
- // Instantiation
- // ----------------------------------------------------------------------------------------------------------------
-
- public OmidClientConfiguration() {
- new YAMLUtils().loadSettings(DEFAULT_CONFIG_FILE_NAME, this);
- }
-
- // ----------------------------------------------------------------------------------------------------------------
- // Getters and setters for config params
- // ----------------------------------------------------------------------------------------------------------------
-
- public ConnType getConnectionType() {
- return connectionType;
- }
-
- @Inject(optional = true)
- @Named("omid.client.connectionType")
- public void setConnectionType(ConnType connectionType) {
- this.connectionType = connectionType;
- }
-
- public String getConnectionString() {
- return connectionString;
- }
-
- @Inject(optional = true)
- @Named("omid.client.connectionString")
- public void setConnectionString(String connectionString) {
- this.connectionString = connectionString;
- }
-
- public int getZkConnectionTimeoutInSecs() {
- return zkConnectionTimeoutInSecs;
- }
-
- @Inject(optional = true)
- @Named("omid.client.zkConnectionTimeoutInSecs")
- public void setZkConnectionTimeoutInSecs(int zkConnectionTimeoutInSecs) {
- this.zkConnectionTimeoutInSecs = zkConnectionTimeoutInSecs;
- }
-
- public int getRequestMaxRetries() {
- return requestMaxRetries;
- }
-
- @Inject(optional = true)
- @Named("omid.client.requestMaxRetries")
- public void setRequestMaxRetries(int requestMaxRetries) {
- this.requestMaxRetries = requestMaxRetries;
- }
-
- public int getRequestTimeoutInMs() {
- return requestTimeoutInMs;
- }
-
- @Inject(optional = true)
- @Named("omid.client.requestTimeoutInMs")
- public void setRequestTimeoutInMs(int requestTimeoutInMs) {
- this.requestTimeoutInMs = requestTimeoutInMs;
- }
-
- public int getReconnectionDelayInSecs() {
- return reconnectionDelayInSecs;
- }
-
- @Inject(optional = true)
- @Named("omid.client.reconnectionDelayInSecs")
- public void setReconnectionDelayInSecs(int reconnectionDelayInSecs) {
- this.reconnectionDelayInSecs = reconnectionDelayInSecs;
- }
-
- public int getRetryDelayInMs() {
- return retryDelayInMs;
- }
-
- @Inject(optional = true)
- @Named("omid.client.retryDelayInMs")
- public void setRetryDelayInMs(int retryDelayInMs) {
- this.retryDelayInMs = retryDelayInMs;
- }
-
- public int getExecutorThreads() {
- return executorThreads;
- }
-
- @Inject(optional = true)
- @Named("omid.client.executorThreads")
- public void setExecutorThreads(int executorThreads) {
- this.executorThreads = executorThreads;
- }
-
- public String getZkCurrentTsoPath() {
- return zkCurrentTsoPath;
- }
-
- @Inject(optional = true)
- @Named("omid.ha.zkCurrentTsoPath")
- public void setZkCurrentTsoPath(String zkCurrentTsoPath) {
- this.zkCurrentTsoPath = zkCurrentTsoPath;
- }
-
- public String getZkNamespace() {
- return zkNamespace;
- }
-
- @Inject(optional = true)
- @Named("omid.ha.zkNamespace")
- public void setZkNamespace(String zkNamespace) {
- this.zkNamespace = zkNamespace;
- }
-
- public PostCommitMode getPostCommitMode() {
- return postCommitMode;
- }
-
- @Inject(optional = true)
- @Named("omid.tm.postCommitMode")
- public void setPostCommitMode(PostCommitMode postCommitMode) {
- this.postCommitMode = postCommitMode;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/transaction-client/src/main/java/com/yahoo/omid/tsoclient/ServiceUnavailableException.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/ServiceUnavailableException.java b/transaction-client/src/main/java/com/yahoo/omid/tsoclient/ServiceUnavailableException.java
deleted file mode 100644
index 2f8717e..0000000
--- a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/ServiceUnavailableException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tsoclient;
-
-/**
- * Thrown when the requests from TSO client to the TSO server have reached
- * a number of retries
- */
-public class ServiceUnavailableException extends Exception {
-
- private static final long serialVersionUID = -1551974284011474385L;
-
- public ServiceUnavailableException(String message) {
- super(message);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/transaction-client/src/main/java/com/yahoo/omid/tsoclient/TSOClient.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/TSOClient.java b/transaction-client/src/main/java/com/yahoo/omid/tsoclient/TSOClient.java
deleted file mode 100644
index dcfa6ca..0000000
--- a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/TSOClient.java
+++ /dev/null
@@ -1,933 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tsoclient;
-
-import com.google.common.base.Charsets;
-import com.google.common.net.HostAndPort;
-import com.google.common.util.concurrent.AbstractFuture;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.yahoo.omid.proto.TSOProto;
-import com.yahoo.omid.zk.ZKUtils;
-import com.yahoo.statemachine.StateMachine;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.cache.ChildData;
-import org.apache.curator.framework.recipes.cache.NodeCache;
-import org.apache.curator.framework.recipes.cache.NodeCacheListener;
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelHandler;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
-import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
-import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayDeque;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Describes the abstract methods to communicate to the TSO server
- */
-public class TSOClient implements TSOProtocol, NodeCacheListener {
-
- private static final Logger LOG = LoggerFactory.getLogger(TSOClient.class);
-
- // Basic configuration constants & defaults TODO: Move DEFAULT_ZK_CLUSTER to a conf class???
- public static final String DEFAULT_ZK_CLUSTER = "localhost:2181";
-
- private static final long DEFAULT_EPOCH = -1L;
- private volatile long epoch = DEFAULT_EPOCH;
-
- // Attributes
- private CuratorFramework zkClient;
- private NodeCache currentTSOZNode;
-
- private ChannelFactory factory;
- private ClientBootstrap bootstrap;
- private Channel currentChannel;
- private final ScheduledExecutorService fsmExecutor;
- StateMachine.Fsm fsm;
-
- private final int requestTimeoutInMs;
- private final int requestMaxRetries;
- private final int tsoReconnectionDelayInSecs;
- private InetSocketAddress tsoAddr;
- private String zkCurrentTsoPath;
-
- // ----------------------------------------------------------------------------------------------------------------
- // Construction
- // ----------------------------------------------------------------------------------------------------------------
-
- public static TSOClient newInstance(OmidClientConfiguration tsoClientConf)
- throws IOException, InterruptedException {
- return new TSOClient(tsoClientConf);
- }
-
- // Avoid instantiation
- private TSOClient(OmidClientConfiguration omidConf) throws IOException, InterruptedException {
-
- // Start client with Nb of active threads = 3 as maximum.
- int tsoExecutorThreads = omidConf.getExecutorThreads();
-
- factory = new NioClientSocketChannelFactory(
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setNameFormat("tsoclient-boss-%d").build()),
- Executors.newCachedThreadPool(
- new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build()), tsoExecutorThreads);
- // Create the bootstrap
- bootstrap = new ClientBootstrap(factory);
-
- requestTimeoutInMs = omidConf.getRequestTimeoutInMs();
- requestMaxRetries = omidConf.getRequestMaxRetries();
- tsoReconnectionDelayInSecs = omidConf.getReconnectionDelayInSecs();
-
- LOG.info("Connecting to TSO...");
- HostAndPort hp;
- switch (omidConf.getConnectionType()) {
- case HA:
- zkClient = ZKUtils.initZKClient(omidConf.getConnectionString(),
- omidConf.getZkNamespace(),
- omidConf.getZkConnectionTimeoutInSecs());
- zkCurrentTsoPath = omidConf.getZkCurrentTsoPath();
- configureCurrentTSOServerZNodeCache(zkCurrentTsoPath);
- String tsoInfo = getCurrentTSOInfoFoundInZK(zkCurrentTsoPath);
- // TSO info includes the new TSO host:port address and epoch
- String[] currentTSOAndEpochArray = tsoInfo.split("#");
- hp = HostAndPort.fromString(currentTSOAndEpochArray[0]);
- setTSOAddress(hp.getHostText(), hp.getPort());
- epoch = Long.parseLong(currentTSOAndEpochArray[1]);
- LOG.info("\t* Current TSO host:port found in ZK: {} Epoch {}", hp, getEpoch());
- break;
- case DIRECT:
- default:
- hp = HostAndPort.fromString(omidConf.getConnectionString());
- setTSOAddress(hp.getHostText(), hp.getPort());
- LOG.info("\t* TSO host:port {} will be connected directly", hp);
- break;
- }
-
- fsmExecutor = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactoryBuilder().setNameFormat("tsofsm-%d").build());
- fsm = new StateMachine.FsmImpl(fsmExecutor);
- fsm.setInitState(new DisconnectedState(fsm));
-
- ChannelPipeline pipeline = bootstrap.getPipeline();
- pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
- pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
- pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
- pipeline.addLast("protobufencoder", new ProtobufEncoder());
- pipeline.addLast("handler", new Handler(fsm));
-
- bootstrap.setOption("tcpNoDelay", true);
- bootstrap.setOption("keepAlive", true);
- bootstrap.setOption("reuseAddress", true);
- bootstrap.setOption("connectTimeoutMillis", 100);
- }
-
- // ----------------------------------------------------------------------------------------------------------------
- // TSOProtocol interface
- // ----------------------------------------------------------------------------------------------------------------
-
- /**
- * @see TSOProtocol#getNewStartTimestamp()
- */
- @Override
- public TSOFuture<Long> getNewStartTimestamp() {
- TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
- TSOProto.TimestampRequest.Builder tsreqBuilder = TSOProto.TimestampRequest.newBuilder();
- builder.setTimestampRequest(tsreqBuilder.build());
- RequestEvent request = new RequestEvent(builder.build(), requestMaxRetries);
- fsm.sendEvent(request);
- return new ForwardingTSOFuture<>(request);
- }
-
- /**
- * @see TSOProtocol#commit(long, Set)
- */
- @Override
- public TSOFuture<Long> commit(long transactionId, Set<? extends CellId> cells) {
- TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
- TSOProto.CommitRequest.Builder commitbuilder = TSOProto.CommitRequest.newBuilder();
- commitbuilder.setStartTimestamp(transactionId);
- for (CellId cell : cells) {
- commitbuilder.addCellId(cell.getCellId());
- }
- builder.setCommitRequest(commitbuilder.build());
- RequestEvent request = new RequestEvent(builder.build(), requestMaxRetries);
- fsm.sendEvent(request);
- return new ForwardingTSOFuture<>(request);
- }
-
- /**
- * @see TSOProtocol#close()
- */
- @Override
- public TSOFuture<Void> close() {
- final CloseEvent closeEvent = new CloseEvent();
- fsm.sendEvent(closeEvent);
- closeEvent.addListener(new Runnable() {
- @Override
- public void run() {
- try {
- closeEvent.get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- e.printStackTrace();
- } catch (ExecutionException e) {
- e.printStackTrace();
- } finally {
- fsmExecutor.shutdown();
- if (currentTSOZNode != null) {
- try {
- currentTSOZNode.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- if (zkClient != null) {
- zkClient.close();
- }
- }
-
- }
- }, fsmExecutor);
- return new ForwardingTSOFuture<>(closeEvent);
- }
-
- // ----------------------------------------------------------------------------------------------------------------
- // High availability related interface
- // ----------------------------------------------------------------------------------------------------------------
-
- /**
- * Returns the epoch of the TSO server that initialized this transaction.
- * Used for high availability support.
- */
- public long getEpoch() {
- return epoch;
- }
-
- // ----------------------------------------------------------------------------------------------------------------
- // NodeCacheListener interface
- // ----------------------------------------------------------------------------------------------------------------
-
- @Override
- public void nodeChanged() throws Exception {
-
- String tsoInfo = getCurrentTSOInfoFoundInZK(zkCurrentTsoPath);
- // TSO info includes the new TSO host:port address and epoch
- String[] currentTSOAndEpochArray = tsoInfo.split("#");
- HostAndPort hp = HostAndPort.fromString(currentTSOAndEpochArray[0]);
- setTSOAddress(hp.getHostText(), hp.getPort());
- epoch = Long.parseLong(currentTSOAndEpochArray[1]);
- LOG.info("CurrentTSO ZNode changed. New TSO Host & Port {}/Epoch {}", hp, getEpoch());
- if (currentChannel != null && currentChannel.isConnected()) {
- LOG.info("\tClosing channel with previous TSO {}", currentChannel);
- currentChannel.close();
- }
-
- }
-
- // ****************************************** Finite State Machine ************************************************
-
- // ----------------------------------------------------------------------------------------------------------------
- // FSM: Events
- // ----------------------------------------------------------------------------------------------------------------
-
- private static class ParamEvent<T> implements StateMachine.Event {
-
- final T param;
-
- ParamEvent(T param) {
- this.param = param;
- }
-
- T getParam() {
- return param;
- }
- }
-
- private static class ErrorEvent extends ParamEvent<Throwable> {
-
- ErrorEvent(Throwable t) {
- super(t);
- }
- }
-
- private static class ConnectedEvent extends ParamEvent<Channel> {
-
- ConnectedEvent(Channel c) {
- super(c);
- }
- }
-
- private static class UserEvent<T> extends AbstractFuture<T>
- implements StateMachine.DeferrableEvent {
-
- void success(T value) {
- set(value);
- }
-
- @Override
- public void error(Throwable t) {
- setException(t);
- }
- }
-
- private static class CloseEvent extends UserEvent<Void> {
-
- }
-
- private static class ChannelClosedEvent extends ParamEvent<Throwable> {
-
- ChannelClosedEvent(Throwable t) {
- super(t);
- }
- }
-
- private static class ReconnectEvent implements StateMachine.Event {
-
- }
-
- private static class HandshakeTimeoutEvent implements StateMachine.Event {
-
- }
-
- private static class TimestampRequestTimeoutEvent implements StateMachine.Event {
-
- }
-
- private static class CommitRequestTimeoutEvent implements StateMachine.Event {
-
- final long startTimestamp;
-
- CommitRequestTimeoutEvent(long startTimestamp) {
- this.startTimestamp = startTimestamp;
- }
-
- public long getStartTimestamp() {
- return startTimestamp;
- }
- }
-
- private static class RequestEvent extends UserEvent<Long> {
-
- TSOProto.Request req;
- int retriesLeft;
-
- RequestEvent(TSOProto.Request req, int retriesLeft) {
- this.req = req;
- this.retriesLeft = retriesLeft;
- }
-
- TSOProto.Request getRequest() {
- return req;
- }
-
- void setRequest(TSOProto.Request request) {
- this.req = request;
- }
-
- int getRetriesLeft() {
- return retriesLeft;
- }
-
- void decrementRetries() {
- retriesLeft--;
- }
-
- }
-
- private static class ResponseEvent extends ParamEvent<TSOProto.Response> {
-
- ResponseEvent(TSOProto.Response r) {
- super(r);
- }
- }
-
- // ----------------------------------------------------------------------------------------------------------------
- // FSM: States
- // ----------------------------------------------------------------------------------------------------------------
-
- class BaseState extends StateMachine.State {
-
- BaseState(StateMachine.Fsm fsm) {
- super(fsm);
- }
-
- public StateMachine.State handleEvent(StateMachine.Event e) {
- LOG.error("Unhandled event {} while in state {}", e, this.getClass().getName());
- return this;
- }
- }
-
- class DisconnectedState extends BaseState {
-
- DisconnectedState(StateMachine.Fsm fsm) {
- super(fsm);
- LOG.debug("NEW STATE: DISCONNECTED");
- }
-
- public StateMachine.State handleEvent(RequestEvent e) {
- fsm.deferEvent(e);
- return tryToConnectToTSOServer();
- }
-
- public StateMachine.State handleEvent(CloseEvent e) {
- factory.releaseExternalResources();
- e.success(null);
- return this;
- }
-
- private StateMachine.State tryToConnectToTSOServer() {
- final InetSocketAddress tsoAddress = getAddress();
- LOG.info("Trying to connect to TSO [{}]", tsoAddress);
- ChannelFuture channelFuture = bootstrap.connect(tsoAddress);
- channelFuture.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture channelFuture) throws Exception {
- if (channelFuture.isSuccess()) {
- LOG.info("Connection to TSO [{}] established. Channel {}",
- tsoAddress, channelFuture.getChannel());
- } else {
- LOG.error("Failed connection attempt to TSO [{}] failed. Channel {}",
- tsoAddress, channelFuture.getChannel());
- }
- }
- });
- return new ConnectingState(fsm);
- }
- }
-
- private class ConnectingState extends BaseState {
-
- ConnectingState(StateMachine.Fsm fsm) {
- super(fsm);
- LOG.debug("NEW STATE: CONNECTING");
- }
-
- public StateMachine.State handleEvent(UserEvent e) {
- fsm.deferEvent(e);
- return this;
- }
-
- public StateMachine.State handleEvent(ConnectedEvent e) {
- return new HandshakingState(fsm, e.getParam());
- }
-
- public StateMachine.State handleEvent(ChannelClosedEvent e) {
- return new ConnectionFailedState(fsm, e.getParam());
- }
-
- public StateMachine.State handleEvent(ErrorEvent e) {
- return new ConnectionFailedState(fsm, e.getParam());
- }
-
- }
-
- private static class RequestAndTimeout {
-
- final RequestEvent event;
- final Timeout timeout;
-
- RequestAndTimeout(RequestEvent event, Timeout timeout) {
- this.event = event;
- this.timeout = timeout;
- }
-
- RequestEvent getRequest() {
- return event;
- }
-
- Timeout getTimeout() {
- return timeout;
- }
-
- public String toString() {
- String info = "Request type ";
- if (event.getRequest().hasTimestampRequest()) {
- info += "[Timestamp]";
- } else if (event.getRequest().hasCommitRequest()) {
- info += "[Commit] Start TS ->" + event.getRequest().getCommitRequest().getStartTimestamp();
- } else {
- info += "NONE";
- }
- return info;
- }
- }
-
- private class HandshakingState extends BaseState {
-
- final Channel channel;
-
- final HashedWheelTimer timeoutExecutor = new HashedWheelTimer(
- new ThreadFactoryBuilder().setNameFormat("tso-client-timeout").build());
- final Timeout timeout;
-
- HandshakingState(StateMachine.Fsm fsm, Channel channel) {
- super(fsm);
- LOG.debug("NEW STATE: HANDSHAKING");
- this.channel = channel;
- TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
- // Add the required handshake capabilities when necessary
- handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
- channel.write(TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
- timeout = newTimeout();
- }
-
- private Timeout newTimeout() {
- if (requestTimeoutInMs > 0) {
- return timeoutExecutor.newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) {
- fsm.sendEvent(new HandshakeTimeoutEvent());
- }
- }, 30, TimeUnit.SECONDS);
- } else {
- return null;
- }
- }
-
- public StateMachine.State handleEvent(UserEvent e) {
- fsm.deferEvent(e);
- return this;
- }
-
- public StateMachine.State handleEvent(ResponseEvent e) {
- if (e.getParam().hasHandshakeResponse() && e.getParam().getHandshakeResponse().getClientCompatible()) {
- if (timeout != null) {
- timeout.cancel();
- }
- return new ConnectedState(fsm, channel, timeoutExecutor);
- } else {
- cleanupState();
- LOG.error("Client incompatible with server");
- return new HandshakeFailedState(fsm, new HandshakeFailedException());
- }
- }
-
- public StateMachine.State handleEvent(HandshakeTimeoutEvent e) {
- cleanupState();
- return new ClosingState(fsm);
- }
-
- public StateMachine.State handleEvent(ErrorEvent e) {
- cleanupState();
- Throwable exception = e.getParam();
- LOG.error("Error during handshake", exception);
- return new HandshakeFailedState(fsm, exception);
- }
-
- private void cleanupState() {
- timeoutExecutor.stop();
- channel.close();
- if (timeout != null) {
- timeout.cancel();
- }
- }
-
- }
-
- class ConnectionFailedState extends BaseState {
-
- final HashedWheelTimer reconnectionTimeoutExecutor = new HashedWheelTimer(
- new ThreadFactoryBuilder().setNameFormat("tso-client-backoff-timeout").build());
-
- Throwable exception;
-
- ConnectionFailedState(final StateMachine.Fsm fsm, final Throwable exception) {
- super(fsm);
- LOG.debug("NEW STATE: CONNECTION FAILED [RE-CONNECTION BACKOFF]");
- this.exception = exception;
- reconnectionTimeoutExecutor.newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) {
- fsm.sendEvent(new ReconnectEvent());
- }
- }, tsoReconnectionDelayInSecs, TimeUnit.SECONDS);
- }
-
- public StateMachine.State handleEvent(UserEvent e) {
- e.error(exception);
- return this;
- }
-
- public StateMachine.State handleEvent(ErrorEvent e) {
- return this;
- }
-
- public StateMachine.State handleEvent(ChannelClosedEvent e) {
- return new DisconnectedState(fsm);
- }
-
- public StateMachine.State handleEvent(ReconnectEvent e) {
- return new DisconnectedState(fsm);
- }
-
- }
-
- private class HandshakeFailedState extends ConnectionFailedState {
-
- HandshakeFailedState(StateMachine.Fsm fsm, Throwable exception) {
- super(fsm, exception);
- LOG.debug("STATE: HANDSHAKING FAILED");
- }
-
- }
-
- class ConnectedState extends BaseState {
-
- final Queue<RequestAndTimeout> timestampRequests;
- final Map<Long, RequestAndTimeout> commitRequests;
- final Channel channel;
-
- final HashedWheelTimer timeoutExecutor;
-
- ConnectedState(StateMachine.Fsm fsm, Channel channel, HashedWheelTimer timeoutExecutor) {
- super(fsm);
- LOG.debug("NEW STATE: CONNECTED");
- this.channel = channel;
- this.timeoutExecutor = timeoutExecutor;
- timestampRequests = new ArrayDeque<>();
- commitRequests = new HashMap<>();
- }
-
- private Timeout newTimeout(final StateMachine.Event timeoutEvent) {
- if (requestTimeoutInMs > 0) {
- return timeoutExecutor.newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) {
- fsm.sendEvent(timeoutEvent);
- }
- }, requestTimeoutInMs, TimeUnit.MILLISECONDS);
- } else {
- return null;
- }
- }
-
- private void sendRequest(final StateMachine.Fsm fsm, RequestEvent request) {
- TSOProto.Request req = request.getRequest();
-
- if (req.hasTimestampRequest()) {
- timestampRequests.add(new RequestAndTimeout(request, newTimeout(new TimestampRequestTimeoutEvent())));
- } else if (req.hasCommitRequest()) {
- TSOProto.CommitRequest commitReq = req.getCommitRequest();
- commitRequests.put(commitReq.getStartTimestamp(), new RequestAndTimeout(
- request, newTimeout(new CommitRequestTimeoutEvent(commitReq.getStartTimestamp()))));
- } else {
- request.error(new IllegalArgumentException("Unknown request type"));
- return;
- }
- ChannelFuture f = channel.write(req);
-
- f.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) {
- if (!future.isSuccess()) {
- fsm.sendEvent(new ErrorEvent(future.getCause()));
- }
- }
- });
- }
-
- private void handleResponse(ResponseEvent response) {
- TSOProto.Response resp = response.getParam();
- if (resp.hasTimestampResponse()) {
- if (timestampRequests.size() == 0) {
- LOG.debug("Received timestamp response when no requests outstanding");
- return;
- }
- RequestAndTimeout e = timestampRequests.remove();
- e.getRequest().success(resp.getTimestampResponse().getStartTimestamp());
- if (e.getTimeout() != null) {
- e.getTimeout().cancel();
- }
- } else if (resp.hasCommitResponse()) {
- long startTimestamp = resp.getCommitResponse().getStartTimestamp();
- RequestAndTimeout e = commitRequests.remove(startTimestamp);
- if (e == null) {
- LOG.debug("Received commit response for request that doesn't exist. Start TS: {}", startTimestamp);
- return;
- }
- if (e.getTimeout() != null) {
- e.getTimeout().cancel();
- }
- if (resp.getCommitResponse().getAborted()) {
- e.getRequest().error(new AbortException());
- } else {
- // Check if the commit response received implies heuristic
- // actions during commit (because there's a new TSO master
- // replica) and inform the caller (e.g. the TxMgr) about it
- if (resp.getCommitResponse().getMakeHeuristicDecision()) {
- e.getRequest().error(new NewTSOException());
- } else {
- e.getRequest().success(resp.getCommitResponse().getCommitTimestamp());
- }
- }
- }
- }
-
- public StateMachine.State handleEvent(TimestampRequestTimeoutEvent e) {
- if (!timestampRequests.isEmpty()) {
- RequestAndTimeout r = timestampRequests.remove();
- if (r.getTimeout() != null) {
- r.getTimeout().cancel();
- }
- queueRetryOrError(fsm, r.getRequest());
- }
- return this;
- }
-
- public StateMachine.State handleEvent(CommitRequestTimeoutEvent e) {
- long startTimestamp = e.getStartTimestamp();
- if (commitRequests.containsKey(startTimestamp)) {
- RequestAndTimeout r = commitRequests.remove(startTimestamp);
- if (r.getTimeout() != null) {
- r.getTimeout().cancel();
- }
- queueRetryOrError(fsm, r.getRequest());
- }
- return this;
- }
-
- public StateMachine.State handleEvent(CloseEvent e) {
- LOG.debug("CONNECTED STATE: CloseEvent");
- timeoutExecutor.stop();
- closeChannelAndErrorRequests();
- fsm.deferEvent(e);
- return new ClosingState(fsm);
- }
-
- public StateMachine.State handleEvent(RequestEvent e) {
- sendRequest(fsm, e);
- return this;
- }
-
- public StateMachine.State handleEvent(ResponseEvent e) {
- handleResponse(e);
- return this;
- }
-
- public StateMachine.State handleEvent(ErrorEvent e) {
- LOG.debug("CONNECTED STATE: ErrorEvent");
- timeoutExecutor.stop();
- handleError(fsm);
- return new ClosingState(fsm);
- }
-
- private void handleError(StateMachine.Fsm fsm) {
- LOG.debug("CONNECTED STATE: Cancelling Timeouts in handleError");
- while (timestampRequests.size() > 0) {
- RequestAndTimeout r = timestampRequests.remove();
- if (r.getTimeout() != null) {
- r.getTimeout().cancel();
- }
- queueRetryOrError(fsm, r.getRequest());
- }
- Iterator<Map.Entry<Long, RequestAndTimeout>> iter = commitRequests.entrySet().iterator();
- while (iter.hasNext()) {
- RequestAndTimeout r = iter.next().getValue();
- if (r.getTimeout() != null) {
- r.getTimeout().cancel();
- }
- queueRetryOrError(fsm, r.getRequest());
- iter.remove();
- }
- channel.close();
- }
-
- private void queueRetryOrError(StateMachine.Fsm fsm, RequestEvent e) {
- if (e.getRetriesLeft() > 0) {
- e.decrementRetries();
- if (e.getRequest().hasCommitRequest()) {
- TSOProto.CommitRequest commitRequest = e.getRequest().getCommitRequest();
- if (!commitRequest.getIsRetry()) { // Create a new retry for the commit request
- TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
- TSOProto.CommitRequest.Builder commitBuilder = TSOProto.CommitRequest.newBuilder();
- commitBuilder.mergeFrom(commitRequest);
- commitBuilder.setIsRetry(true);
- builder.setCommitRequest(commitBuilder.build());
- e.setRequest(builder.build());
- }
- }
- fsm.sendEvent(e);
- } else {
- e.error(
- new ServiceUnavailableException("Number of retries exceeded. This API request failed permanently"));
- }
- }
-
- private void closeChannelAndErrorRequests() {
- channel.close();
- for (RequestAndTimeout r : timestampRequests) {
- if (r.getTimeout() != null) {
- r.getTimeout().cancel();
- }
- r.getRequest().error(new ClosingException());
- }
- for (RequestAndTimeout r : commitRequests.values()) {
- if (r.getTimeout() != null) {
- r.getTimeout().cancel();
- }
- r.getRequest().error(new ClosingException());
- }
- }
- }
-
- private class ClosingState extends BaseState {
-
- ClosingState(StateMachine.Fsm fsm) {
- super(fsm);
- LOG.debug("NEW STATE: CLOSING");
- }
-
- public StateMachine.State handleEvent(TimestampRequestTimeoutEvent e) {
- // Ignored. They will be retried or errored
- return this;
- }
-
- public StateMachine.State handleEvent(CommitRequestTimeoutEvent e) {
- // Ignored. They will be retried or errored
- return this;
- }
-
- public StateMachine.State handleEvent(ErrorEvent e) {
- // Ignored. They will be retried or errored
- return this;
- }
-
- public StateMachine.State handleEvent(ResponseEvent e) {
- // Ignored. They will be retried or errored
- return this;
- }
-
- public StateMachine.State handleEvent(UserEvent e) {
- fsm.deferEvent(e);
- return this;
- }
-
- public StateMachine.State handleEvent(ChannelClosedEvent e) {
- return new DisconnectedState(fsm);
- }
-
- public StateMachine.State handleEvent(HandshakeTimeoutEvent e) {
- return this;
- }
-
- }
-
- // ----------------------------------------------------------------------------------------------------------------
- // Helper classes & methods
- // ----------------------------------------------------------------------------------------------------------------
-
- private class Handler extends SimpleChannelHandler {
-
- private StateMachine.Fsm fsm;
-
- Handler(StateMachine.Fsm fsm) {
- this.fsm = fsm;
- }
-
- @Override
- public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
- currentChannel = e.getChannel();
- LOG.debug("HANDLER (CHANNEL CONNECTED): Connection {}. Sending connected event to FSM", e);
- fsm.sendEvent(new ConnectedEvent(e.getChannel()));
- }
-
- @Override
- public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- LOG.debug("HANDLER (CHANNEL DISCONNECTED): Connection {}. Sending error event to FSM", e);
- fsm.sendEvent(new ErrorEvent(new ConnectionException()));
- }
-
- @Override
- public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- LOG.debug("HANDLER (CHANNEL CLOSED): Connection {}. Sending channel closed event to FSM", e);
- fsm.sendEvent(new ChannelClosedEvent(new ConnectionException()));
- }
-
- @Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
- if (e.getMessage() instanceof TSOProto.Response) {
- fsm.sendEvent(new ResponseEvent((TSOProto.Response) e.getMessage()));
- } else {
- LOG.warn("Received unknown message", e.getMessage());
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
- LOG.error("Error on channel {}", ctx.getChannel(), e.getCause());
- fsm.sendEvent(new ErrorEvent(e.getCause()));
- }
- }
-
- private synchronized void setTSOAddress(String host, int port) {
- tsoAddr = new InetSocketAddress(host, port);
- }
-
- private synchronized InetSocketAddress getAddress() {
- return tsoAddr;
- }
-
- private void configureCurrentTSOServerZNodeCache(String currentTsoPath) {
- try {
- currentTSOZNode = new NodeCache(zkClient, currentTsoPath);
- currentTSOZNode.getListenable().addListener(this);
- currentTSOZNode.start(true);
- } catch (Exception e) {
- throw new IllegalStateException("Cannot start watcher on current TSO Server ZNode: " + e.getMessage());
- }
- }
-
- private String getCurrentTSOInfoFoundInZK(String currentTsoPath) {
- ChildData currentTSOData = currentTSOZNode.getCurrentData();
- if (currentTSOData == null) {
- throw new IllegalStateException("No data found in ZKNode " + currentTsoPath);
- }
- byte[] currentTSOAndEpochAsBytes = currentTSOData.getData();
- if (currentTSOAndEpochAsBytes == null) {
- throw new IllegalStateException("No data found for current TSO in ZKNode " + currentTsoPath);
- }
- return new String(currentTSOAndEpochAsBytes, Charsets.UTF_8);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/transaction-client/src/main/java/com/yahoo/omid/tsoclient/TSOFuture.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/TSOFuture.java b/transaction-client/src/main/java/com/yahoo/omid/tsoclient/TSOFuture.java
deleted file mode 100644
index 0999065..0000000
--- a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/TSOFuture.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tsoclient;
-
-import java.util.concurrent.Executor;
-import java.util.concurrent.Future;
-
-public interface TSOFuture<T> extends Future<T> {
- public void addListener(Runnable listener, Executor executor);
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/transaction-client/src/main/java/com/yahoo/omid/tsoclient/TSOProtocol.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/TSOProtocol.java b/transaction-client/src/main/java/com/yahoo/omid/tsoclient/TSOProtocol.java
deleted file mode 100644
index 553a15c..0000000
--- a/transaction-client/src/main/java/com/yahoo/omid/tsoclient/TSOProtocol.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tsoclient;
-
-import java.util.Set;
-
-/**
- * Defines the protocol used on the client side to abstract communication to the TSO server
- */
-public interface TSOProtocol {
-
- /**
- * Returns a new timestamp assigned by on the server-side
- * @return the newly assigned timestamp as a future. If an error was detected, the future will contain a
- * corresponding protocol exception
- * @see TimestampOracle
- * @see TSOServer
- */
- TSOFuture<Long> getNewStartTimestamp();
-
- /**
- * Returns the result of the conflict detection made on the server-side for the specified transaction
- * @param transactionId
- * the transaction to check for conflicts
- * @param writeSet
- * the writeSet of the transaction, which includes all the modified cells
- * @return the commit timestamp as a future if the transaction was committed. If the transaction was aborted due
- * to conflicts with a concurrent transaction, the future will include an AbortException. If an error was detected,
- * the future will contain a corresponding protocol exception
- * @see TimestampOracle
- * @see TSOServer
- */
- TSOFuture<Long> commit(long transactionId, Set<? extends CellId> writeSet);
-
- /**
- * Closes the communication with the TSO server
- * @return nothing. If an error was detected, the future will contain a corresponding protocol exception
- */
- TSOFuture<Void> close();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/transaction-client/src/main/resources/omid-client-config.yml
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/resources/omid-client-config.yml b/transaction-client/src/main/resources/omid-client-config.yml
index 7e362bb..daed7d3 100644
--- a/transaction-client/src/main/resources/omid-client-config.yml
+++ b/transaction-client/src/main/resources/omid-client-config.yml
@@ -7,13 +7,13 @@
# ---------------------------------------------------------------------------------------------------------------------
# Direct connection to host:port
-connectionType: !!com.yahoo.omid.tsoclient.OmidClientConfiguration$ConnType DIRECT
+connectionType: !!com.yahoo.omid.tso.client.OmidClientConfiguration$ConnType DIRECT
connectionString: "localhost:54758"
# When Omid is working in High Availability mode, two or more replicas of the TSO server are running in primary/backup
# mode. When a TSO server replica is elected as master, it publishes its address through ZK. In order to configure
# the Omid client to access the TSO server in HA mode:
-# 1) set 'connectionType' to !!com.yahoo.omid.tsoclient.OmidClientConfiguration$ConnType HA
+# 1) set 'connectionType' to !!com.yahoo.omid.tso.client.OmidClientConfiguration$ConnType HA
# 2) set 'connectionString' to the ZK cluster connection string where the server is publishing its address
zkConnectionTimeoutInSecs: 10
# In HA mode, make sure that the next settings match same settings on the TSO server side
@@ -36,4 +36,4 @@ executorThreads: 3
# Configure whether the TM performs the post-commit actions for a tx (update shadow cells and clean commit table entry)
# before returning to the control to the client (SYNC) or in parallel (ASYNC)
-postCommitMode: !!com.yahoo.omid.tsoclient.OmidClientConfiguration$PostCommitMode SYNC
\ No newline at end of file
+postCommitMode: !!com.yahoo.omid.tso.client.OmidClientConfiguration$PostCommitMode SYNC
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/transaction-client/src/test/java/com/yahoo/omid/tso/client/TestMockTSOClient.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/test/java/com/yahoo/omid/tso/client/TestMockTSOClient.java b/transaction-client/src/test/java/com/yahoo/omid/tso/client/TestMockTSOClient.java
new file mode 100644
index 0000000..deca92f
--- /dev/null
+++ b/transaction-client/src/test/java/com/yahoo/omid/tso/client/TestMockTSOClient.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.omid.tso.client;
+
+import com.google.common.collect.Sets;
+import com.yahoo.omid.committable.CommitTable;
+import com.yahoo.omid.committable.InMemoryCommitTable;
+import com.yahoo.omid.tso.util.DummyCellIdImpl;
+import org.testng.Assert;
+import org.testng.AssertJUnit;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.ExecutionException;
+
+import static org.testng.AssertJUnit.assertEquals;
+
+public class TestMockTSOClient {
+
+ final static public CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
+ final static public CellId c2 = new DummyCellIdImpl(-0xfeedcafeL);
+
+ @Test(timeOut = 10000)
+ public void testConflicts() throws Exception {
+ CommitTable commitTable = new InMemoryCommitTable();
+ TSOProtocol client = new MockTSOClient(commitTable.getWriter());
+
+ long tr1 = client.getNewStartTimestamp().get();
+ long tr2 = client.getNewStartTimestamp().get();
+
+ client.commit(tr1, Sets.newHashSet(c1)).get();
+
+ try {
+ client.commit(tr2, Sets.newHashSet(c1, c2)).get();
+ Assert.fail("Shouldn't have committed");
+ } catch (ExecutionException ee) {
+ assertEquals("Should have aborted", ee.getCause().getClass(), AbortException.class);
+ }
+ }
+
+ @Test(timeOut = 10000)
+ public void testWatermarkUpdate() throws Exception {
+ CommitTable commitTable = new InMemoryCommitTable();
+ TSOProtocol client = new MockTSOClient(commitTable.getWriter());
+ CommitTable.Client commitTableClient = commitTable.getClient();
+
+ long tr1 = client.getNewStartTimestamp().get();
+ client.commit(tr1, Sets.newHashSet(c1)).get();
+
+ long initWatermark = commitTableClient.readLowWatermark().get();
+
+ long tr2 = client.getNewStartTimestamp().get();
+ client.commit(tr2, Sets.newHashSet(c1)).get();
+
+ long newWatermark = commitTableClient.readLowWatermark().get();
+ AssertJUnit.assertTrue("new low watermark should be bigger", newWatermark > initWatermark);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/transaction-client/src/test/java/com/yahoo/omid/tso/client/TestOmidClientConfiguration.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/test/java/com/yahoo/omid/tso/client/TestOmidClientConfiguration.java b/transaction-client/src/test/java/com/yahoo/omid/tso/client/TestOmidClientConfiguration.java
new file mode 100644
index 0000000..731401e
--- /dev/null
+++ b/transaction-client/src/test/java/com/yahoo/omid/tso/client/TestOmidClientConfiguration.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.omid.tso.client;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestOmidClientConfiguration {
+
+ @Test
+ public void testYamlReading() {
+ OmidClientConfiguration configuration = new OmidClientConfiguration();
+ Assert.assertNotNull(configuration.getConnectionString());
+ Assert.assertNotNull(configuration.getConnectionType());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/transaction-client/src/test/java/com/yahoo/omid/tsoclient/TestMockTSOClient.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/test/java/com/yahoo/omid/tsoclient/TestMockTSOClient.java b/transaction-client/src/test/java/com/yahoo/omid/tsoclient/TestMockTSOClient.java
deleted file mode 100644
index 5ea5ffb..0000000
--- a/transaction-client/src/test/java/com/yahoo/omid/tsoclient/TestMockTSOClient.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tsoclient;
-
-import com.google.common.collect.Sets;
-import com.yahoo.omid.committable.CommitTable;
-import com.yahoo.omid.committable.InMemoryCommitTable;
-import com.yahoo.omid.tso.util.DummyCellIdImpl;
-import org.testng.Assert;
-import org.testng.AssertJUnit;
-import org.testng.annotations.Test;
-
-import java.util.concurrent.ExecutionException;
-
-import static org.testng.AssertJUnit.assertEquals;
-
-public class TestMockTSOClient {
-
- final static public CellId c1 = new DummyCellIdImpl(0xdeadbeefL);
- final static public CellId c2 = new DummyCellIdImpl(-0xfeedcafeL);
-
- @Test(timeOut = 10000)
- public void testConflicts() throws Exception {
- CommitTable commitTable = new InMemoryCommitTable();
- TSOProtocol client = new MockTSOClient(commitTable.getWriter());
-
- long tr1 = client.getNewStartTimestamp().get();
- long tr2 = client.getNewStartTimestamp().get();
-
- client.commit(tr1, Sets.newHashSet(c1)).get();
-
- try {
- client.commit(tr2, Sets.newHashSet(c1, c2)).get();
- Assert.fail("Shouldn't have committed");
- } catch (ExecutionException ee) {
- assertEquals("Should have aborted", ee.getCause().getClass(), AbortException.class);
- }
- }
-
- @Test(timeOut = 10000)
- public void testWatermarkUpdate() throws Exception {
- CommitTable commitTable = new InMemoryCommitTable();
- TSOProtocol client = new MockTSOClient(commitTable.getWriter());
- CommitTable.Client commitTableClient = commitTable.getClient();
-
- long tr1 = client.getNewStartTimestamp().get();
- client.commit(tr1, Sets.newHashSet(c1)).get();
-
- long initWatermark = commitTableClient.readLowWatermark().get();
-
- long tr2 = client.getNewStartTimestamp().get();
- client.commit(tr2, Sets.newHashSet(c1)).get();
-
- long newWatermark = commitTableClient.readLowWatermark().get();
- AssertJUnit.assertTrue("new low watermark should be bigger", newWatermark > initWatermark);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/transaction-client/src/test/java/com/yahoo/omid/tsoclient/TestOmidClientConfiguration.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/test/java/com/yahoo/omid/tsoclient/TestOmidClientConfiguration.java b/transaction-client/src/test/java/com/yahoo/omid/tsoclient/TestOmidClientConfiguration.java
deleted file mode 100644
index 497ec6d..0000000
--- a/transaction-client/src/test/java/com/yahoo/omid/tsoclient/TestOmidClientConfiguration.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.yahoo.omid.tsoclient;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-public class TestOmidClientConfiguration {
-
- @Test
- public void testYamlReading() {
- OmidClientConfiguration configuration = new OmidClientConfiguration();
- Assert.assertNotNull(configuration.getConnectionString());
- Assert.assertNotNull(configuration.getConnectionType());
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/tso-server/src/test/java/com/yahoo/omid/tso/TestLeaseManager.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/com/yahoo/omid/tso/TestLeaseManager.java b/tso-server/src/test/java/com/yahoo/omid/tso/TestLeaseManager.java
index 4da3c45..71e2ecb 100644
--- a/tso-server/src/test/java/com/yahoo/omid/tso/TestLeaseManager.java
+++ b/tso-server/src/test/java/com/yahoo/omid/tso/TestLeaseManager.java
@@ -33,7 +33,7 @@ import org.testng.annotations.Test;
import java.io.IOException;
-import static com.yahoo.omid.tsoclient.TSOClient.DEFAULT_ZK_CLUSTER;
+import static com.yahoo.omid.tso.client.TSOClient.DEFAULT_ZK_CLUSTER;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.mock;
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/tso-server/src/test/java/com/yahoo/omid/tso/client/TSOClientAccessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/com/yahoo/omid/tso/client/TSOClientAccessor.java b/tso-server/src/test/java/com/yahoo/omid/tso/client/TSOClientAccessor.java
new file mode 100644
index 0000000..eeff214
--- /dev/null
+++ b/tso-server/src/test/java/com/yahoo/omid/tso/client/TSOClientAccessor.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.omid.tso.client;
+
+import com.yahoo.statemachine.StateMachine.FsmImpl;
+
+public class TSOClientAccessor {
+
+ public static void closeChannel(TSOClient tsoClient) throws InterruptedException {
+ FsmImpl fsm = (FsmImpl) tsoClient.fsm;
+ TSOClient.ConnectedState connectedState = (TSOClient.ConnectedState) fsm.getState();
+ connectedState.channel.close().await();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/c6410f7d/tso-server/src/test/java/com/yahoo/omid/tso/client/TSOClientOneShot.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/com/yahoo/omid/tso/client/TSOClientOneShot.java b/tso-server/src/test/java/com/yahoo/omid/tso/client/TSOClientOneShot.java
new file mode 100644
index 0000000..03dbbd7
--- /dev/null
+++ b/tso-server/src/test/java/com/yahoo/omid/tso/client/TSOClientOneShot.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.yahoo.omid.tso.client;
+
+import com.yahoo.omid.proto.TSOProto;
+import com.yahoo.omid.proto.TSOProto.Response;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Communication endpoint for TSO clients.
+ */
+public class TSOClientOneShot {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TSOClientOneShot.class);
+
+ private final String host;
+ private final int port;
+
+ public TSOClientOneShot(String host, int port) {
+
+ this.host = host;
+ this.port = port;
+
+ }
+
+ public TSOProto.Response makeRequest(TSOProto.Request request)
+ throws InterruptedException, ExecutionException {
+ TSOClientRaw raw = new TSOClientRaw(host, port);
+
+ // do handshake
+ TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
+ handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
+ raw.write(TSOProto.Request.newBuilder()
+ .setHandshakeRequest(handshake.build()).build());
+ Response response = raw.getResponse().get();
+ assert (response.getHandshakeResponse().getClientCompatible());
+
+ raw.write(request);
+ response = raw.getResponse().get();
+
+ raw.close();
+ return response;
+ }
+
+}