You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/06 23:02:47 UTC
[41/51] [partial] incubator-tephra git commit: Rename package to
org.apache.tephra
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersionSpecificFactory.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersionSpecificFactory.java b/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersionSpecificFactory.java
deleted file mode 100644
index 2bc51b5..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersionSpecificFactory.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.util;
-
-import com.google.inject.Provider;
-import com.google.inject.ProvisionException;
-import org.apache.twill.internal.utils.Instances;
-
-/**
- * Common class factory behavior for classes which need specific implementations depending on HBase versions.
- * Specific factories can subclass this class and simply plug in the class names for their implementations.
- *
- * @param <T> Version specific class provided by this factory.
- */
-public abstract class HBaseVersionSpecificFactory<T> implements Provider<T> {
- @Override
- public T get() {
- T instance = null;
- try {
- switch (HBaseVersion.get()) {
- case HBASE_94:
- throw new ProvisionException("HBase 0.94 is no longer supported. Please upgrade to HBase 0.96 or newer.");
- case HBASE_96:
- instance = createInstance(getHBase96Classname());
- break;
- case HBASE_98:
- instance = createInstance(getHBase98Classname());
- break;
- case HBASE_10:
- instance = createInstance(getHBase10Classname());
- break;
- case HBASE_10_CDH:
- instance = createInstance(getHBase10CDHClassname());
- break;
- case HBASE_11:
- case HBASE_12_CDH:
- instance = createInstance(getHBase11Classname());
- break;
- case UNKNOWN:
- throw new ProvisionException("Unknown HBase version: " + HBaseVersion.getVersionString());
- }
- } catch (ClassNotFoundException cnfe) {
- throw new ProvisionException(cnfe.getMessage(), cnfe);
- }
- return instance;
- }
-
- protected T createInstance(String className) throws ClassNotFoundException {
- Class clz = Class.forName(className);
- return (T) Instances.newInstance(clz);
- }
-
- protected abstract String getHBase96Classname();
- protected abstract String getHBase98Classname();
- protected abstract String getHBase10Classname();
- protected abstract String getHBase10CDHClassname();
- protected abstract String getHBase11Classname();
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/util/TxUtils.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/util/TxUtils.java b/tephra-core/src/main/java/co/cask/tephra/util/TxUtils.java
deleted file mode 100644
index cdc9fef..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/util/TxUtils.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.util;
-
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionType;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.persist.TransactionVisibilityState;
-import com.google.common.primitives.Longs;
-
-import java.util.Map;
-
-/**
- * Utility methods supporting transaction operations.
- */
-public class TxUtils {
-
- // Any cell with timestamp less than MAX_NON_TX_TIMESTAMP is assumed to be pre-existing data,
- // i.e. data written before table was converted into transactional table using Tephra.
- // Using 1.1 times current time to determine whether a timestamp is transactional timestamp or not is safe, and does
- // not produce any false positives or false negatives.
- //
- // To prove this, let's say the earliest transactional timestamp written by Tephra was in year 2000, and the oldest
- // that will be written is in year 2200.
- // 01-Jan-2000 GMT is 946684800000 milliseconds since epoch.
- // 31-Dec-2200 GMT is 7289654399000 milliseconds since epoch.
- //
- // Let's say that we enabled transactions on a table on 01-Jan-2000, then 1.1 * 946684800000 = 31-Dec-2002. Using
- // 31-Dec-2002, we can safely say from 01-Jan-2000 onwards, whether a cell timestamp is
- // non-transactional (<= 946684800000).
- // Note that transactional timestamp will be greater than 946684800000000000 (> year 31969) at this instant.
- //
- // On the other end, let's say we enabled transactions on a table on 31-Dec-2200,
- // then 1.1 * 7289654399000 = 07-Feb-2224. Again, we can use this time from 31-Dec-2200 onwards to say whether a
- // cell timestamp is transactional (<= 7289654399000).
- // Note that transactional timestamp will be greater than 7289654399000000000 (> year 232969) at this instant.
- private static final long MAX_NON_TX_TIMESTAMP = (long) (System.currentTimeMillis() * 1.1);
-
- /**
- * Returns the oldest visible timestamp for the given transaction, based on the TTLs configured for each column
- * family. If no TTL is set on any column family, the oldest visible timestamp will be {@code 0}.
- * @param ttlByFamily A map of column family name to TTL value (in milliseconds)
- * @param tx The current transaction
- * @return The oldest timestamp that will be visible for the given transaction and TTL configuration
- */
- public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Transaction tx) {
- long maxTTL = getMaxTTL(ttlByFamily);
- // we know that data will not be cleaned up while this tx is running up to this point as janitor uses it
- return maxTTL < Long.MAX_VALUE ? tx.getVisibilityUpperBound() - maxTTL * TxConstants.MAX_TX_PER_MS : 0;
- }
-
- /**
- * Returns the oldest visible timestamp for the given transaction, based on the TTLs configured for each column
- * family. If no TTL is set on any column family, the oldest visible timestamp will be {@code 0}.
- * @param ttlByFamily A map of column family name to TTL value (in milliseconds)
- * @param tx The current transaction
- * @param readNonTxnData indicates that the timestamp returned should allow reading non-transactional data
- * @return The oldest timestamp that will be visible for the given transaction and TTL configuration
- */
- public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Transaction tx, boolean readNonTxnData) {
- if (readNonTxnData) {
- long maxTTL = getMaxTTL(ttlByFamily);
- return maxTTL < Long.MAX_VALUE ? System.currentTimeMillis() - maxTTL : 0;
- }
-
- return getOldestVisibleTimestamp(ttlByFamily, tx);
- }
-
- /**
- * Returns the maximum timestamp to use for time-range operations, based on the given transaction.
- * @param tx The current transaction
- * @return The maximum timestamp (exclusive) to use for time-range operations
- */
- public static long getMaxVisibleTimestamp(Transaction tx) {
- // NOTE: +1 here because we want read up to writepointer inclusive, but timerange's end is exclusive
- // however, we also need to guard against overflow in the case write pointer is set to MAX_VALUE
- return tx.getWritePointer() < Long.MAX_VALUE ?
- tx.getWritePointer() + 1 : tx.getWritePointer();
- }
-
- /**
- * Creates a "dummy" transaction based on the given txVisibilityState's state. This is not a "real" transaction in
- * the sense that it has not been started, data should not be written with it, and it cannot be committed. However,
- * this can still be useful for filtering data according to the txVisibilityState's state. Instead of the actual
- * write pointer from the txVisibilityState, however, we use {@code Long.MAX_VALUE} to avoid mis-identifying any cells
- * as being written by this transaction (and therefore visible).
- */
- public static Transaction createDummyTransaction(TransactionVisibilityState txVisibilityState) {
- return new Transaction(txVisibilityState.getReadPointer(), Long.MAX_VALUE,
- Longs.toArray(txVisibilityState.getInvalid()),
- Longs.toArray(txVisibilityState.getInProgress().keySet()),
- TxUtils.getFirstShortInProgress(txVisibilityState.getInProgress()), TransactionType.SHORT);
- }
-
- /**
- * Returns the write pointer for the first "short" transaction that in the in-progress set, or
- * {@link Transaction#NO_TX_IN_PROGRESS} if none.
- */
- public static long getFirstShortInProgress(Map<Long, TransactionManager.InProgressTx> inProgress) {
- long firstShort = Transaction.NO_TX_IN_PROGRESS;
- for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) {
- if (!entry.getValue().isLongRunning()) {
- firstShort = entry.getKey();
- break;
- }
- }
- return firstShort;
- }
-
- /**
- * Returns the timestamp for calculating time to live for the given cell timestamp.
- * This takes into account pre-existing non-transactional cells while calculating the time.
- */
- public static long getTimestampForTTL(long cellTs) {
- return isPreExistingVersion(cellTs) ? cellTs * TxConstants.MAX_TX_PER_MS : cellTs;
- }
-
- /**
- * Returns the max TTL for the given TTL values. Returns Long.MAX_VALUE if any of the column families has no TTL set.
- */
- private static long getMaxTTL(Map<byte[], Long> ttlByFamily) {
- long maxTTL = 0;
- for (Long familyTTL : ttlByFamily.values()) {
- maxTTL = Math.max(familyTTL <= 0 ? Long.MAX_VALUE : familyTTL, maxTTL);
- }
- return maxTTL == 0 ? Long.MAX_VALUE : maxTTL;
- }
-
- /**
- * Returns true if version was written before table was converted into transactional table, false otherwise.
- */
- public static boolean isPreExistingVersion(long version) {
- return version < MAX_NON_TX_TIMESTAMP;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/visibility/DefaultFenceWait.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/visibility/DefaultFenceWait.java b/tephra-core/src/main/java/co/cask/tephra/visibility/DefaultFenceWait.java
deleted file mode 100644
index 3d5b25b..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/visibility/DefaultFenceWait.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.visibility;
-
-import co.cask.tephra.TransactionContext;
-import co.cask.tephra.TransactionFailureException;
-import com.google.common.base.Stopwatch;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Default implementation of {@link FenceWait}.
- */
-public class DefaultFenceWait implements FenceWait {
- private static final Logger LOG = LoggerFactory.getLogger(DefaultFenceWait.class);
-
- private final TransactionContext txContext;
-
- DefaultFenceWait(TransactionContext txContext) {
- this.txContext = txContext;
- }
-
- @Override
- public void await(long timeout, TimeUnit timeUnit)
- throws TransactionFailureException, InterruptedException, TimeoutException {
- Stopwatch stopwatch = new Stopwatch();
- stopwatch.start();
- long sleepTimeMicros = timeUnit.toMicros(timeout) / 10;
- // Have sleep time to be within 1 microsecond and 500 milliseconds
- sleepTimeMicros = Math.max(Math.min(sleepTimeMicros, 500 * 1000), 1);
- while (stopwatch.elapsedTime(timeUnit) < timeout) {
- txContext.start();
- try {
- txContext.finish();
- return;
- } catch (TransactionFailureException e) {
- LOG.error("Got exception waiting for fence. Sleeping for {} microseconds", sleepTimeMicros, e);
- txContext.abort();
- TimeUnit.MICROSECONDS.sleep(sleepTimeMicros);
- }
- }
- throw new TimeoutException("Timeout waiting for fence");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/visibility/FenceWait.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/visibility/FenceWait.java b/tephra-core/src/main/java/co/cask/tephra/visibility/FenceWait.java
deleted file mode 100644
index a5370b2..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/visibility/FenceWait.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.visibility;
-
-import co.cask.tephra.TransactionFailureException;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * Used by a writer to wait on a fence so that changes are visible to all readers with in-progress transactions.
- */
-public interface FenceWait {
- /**
- * Waits until the fence is complete, or till the timeout specified. The fence wait transaction will get re-tried
- * several times until the timeout.
- * <p>
- *
- * If a fence wait times out then it means there are still some readers with in-progress transactions that have not
- * seen the change. In this case the wait will have to be retried using the same FenceWait object.
- *
- * @param timeout Maximum time to wait
- * @param timeUnit {@link TimeUnit} for timeout and sleepTime
- * @throws TransactionFailureException when not able to start fence wait transaction
- * @throws InterruptedException on any interrupt
- * @throws TimeoutException when timeout is reached
- */
- void await(long timeout, TimeUnit timeUnit)
- throws TransactionFailureException, InterruptedException, TimeoutException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/visibility/ReadFence.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/visibility/ReadFence.java b/tephra-core/src/main/java/co/cask/tephra/visibility/ReadFence.java
deleted file mode 100644
index f7b8850..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/visibility/ReadFence.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.visibility;
-
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionAware;
-import com.google.common.primitives.Bytes;
-import com.google.common.primitives.Longs;
-
-import java.util.Collection;
-import java.util.Collections;
-
-/**
- * Implementation of {@link VisibilityFence} used by reader.
- */
-class ReadFence implements TransactionAware {
- private final byte[] fenceId;
- private Transaction tx;
-
- public ReadFence(byte[] fenceId) {
- this.fenceId = fenceId;
- }
-
- @Override
- public void startTx(Transaction tx) {
- this.tx = tx;
- }
-
- @Override
- public void updateTx(Transaction tx) {
- // Fences only need original transaction
- }
-
- @Override
- public Collection<byte[]> getTxChanges() {
- if (tx == null) {
- throw new IllegalStateException("Transaction has not started yet");
- }
- return Collections.singleton(Bytes.concat(fenceId, Longs.toByteArray(tx.getTransactionId())));
- }
-
- @Override
- public boolean commitTx() throws Exception {
- // Nothing to persist
- return true;
- }
-
- @Override
- public void postTxCommit() {
- tx = null;
- }
-
- @Override
- public boolean rollbackTx() throws Exception {
- // Nothing to rollback
- return true;
- }
-
- @Override
- public String getTransactionAwareName() {
- return getClass().getSimpleName();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/visibility/VisibilityFence.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/visibility/VisibilityFence.java b/tephra-core/src/main/java/co/cask/tephra/visibility/VisibilityFence.java
deleted file mode 100644
index beb92ed..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/visibility/VisibilityFence.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.visibility;
-
-import co.cask.tephra.TransactionAware;
-import co.cask.tephra.TransactionContext;
-import co.cask.tephra.TransactionFailureException;
-import co.cask.tephra.TransactionSystemClient;
-
-import java.util.concurrent.TimeoutException;
-
-/**
- * VisibilityFence is used to ensure that after a given point in time, all readers see an updated change
- * that got committed.
- * <p>
- *
- * Typically a reader will never conflict with a writer, since a reader only sees committed changes when its
- * transaction started. However to ensure that after a given point all readers are aware of a change,
- * we have to introduce a conflict between a reader and a writer that act on the same data concurrently.
- *<p>
- *
- * This is done by the reader indicating that it is interested in changes to a piece of data by using a fence
- * in its transaction. If there are no changes to the data when reader tries to commit the transaction
- * containing the fence, the commit succeeds.
- * <p>
- *
- * On the other hand, a writer updates the same data in a transaction. After the write transaction is committed,
- * the writer then waits on the fence to ensure that all in-progress readers are aware of this update.
- * When the wait on the fence returns successfully, it means that
- * any in-progress readers that have not seen the change will not be allowed to commit anymore. This will
- * force the readers to start a new transaction, and this ensures that the changes made by writer are visible
- * to the readers.
- *<p>
- *
- * In case an in-progress reader commits when the writer is waiting on the fence, then the wait method will retry
- * until the given timeout.
- * <p>
- *
- * Hence a successful await on a fence ensures that any reader (using the same fence) that successfully commits after
- * this point onwards would see the change.
- *
- * <p>
- * Sample reader code:
- * <pre>
- * <code>
- * TransactionAware fence = VisibilityFence.create(fenceId);
- * TransactionContext readTxContext = new TransactionContext(txClient, fence, table1, table2, ...);
- * readTxContext.start();
- *
- * // do operations using table1, table2, etc.
- *
- * // finally commit
- * try {
- * readTxContext.finish();
- * } catch (TransactionConflictException e) {
- * // handle conflict by aborting and starting over with a new transaction
- * }
- * </code>
- * </pre>
- *<p>
- *
- * Sample writer code:
- * <pre>
- * <code>
- * // start transaction
- * // write change
- * // commit transaction
- *
- * // Now wait on the fence (with the same fenceId as the readers) to ensure that all in-progress readers are
- * aware of this change
- * try {
- * FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, txClient);
- * fenceWait.await(50000, 50, TimeUnit.MILLISECONDS);
- * } catch (TimeoutException e) {
- * // await timed out, the change may not be visible to all in-progress readers.
- * // Application has two options at this point:
- * // 1. Revert the write. Re-try the write and fence wait again.
- * // 2. Retry only the wait with the same fenceWait object (retry logic is not shown here).
- * }
- * </code>
- * </pre>
- *
- * fenceId in the above samples refers to any id that both the readers and writer know for a given
- * piece of data. Both readers and writer will have to use the same fenceId to synchronize on a given data.
- * Typically fenceId uniquely identifies the data in question.
- * For example, if the data is a table row, the fenceId can be composed of table name and row key.
- * If the data is a table cell, the fenceId can be composed of table name, row key, and column key.
- *<p>
- *
- * Note that in this implementation, any reader that starts a transaction after the write is committed, and
- * while this read transaction is in-progress, if a writer successfully starts and completes an await on the fence then
- * this reader will get a conflict while committing the fence even though this reader has seen the latest changes.
- * This is because today there is no way to determine the commit time of a transaction.
- */
-public final class VisibilityFence {
- private VisibilityFence() {
- // Cannot instantiate this class, all functionality is through static methods.
- }
-
- /**
- * Used by a reader to get a fence that can be added to its transaction context.
- *
- * @param fenceId uniquely identifies the data that this fence is used to synchronize.
- * If the data is a table cell then this id can be composed of the table name, row key
- * and column key for the data.
- * @return {@link TransactionAware} to be added to reader's transaction context.
- */
- public static TransactionAware create(byte[] fenceId) {
- return new ReadFence(fenceId);
- }
-
- /**
- * Used by a writer to wait on a fence so that changes are visible to all readers with in-progress transactions.
- *
- * @param fenceId uniquely identifies the data that this fence is used to synchronize.
- * If the data is a table cell then this id can be composed of the table name, row key
- * and column key for the data.
- * @return {@link FenceWait} object
- */
- public static FenceWait prepareWait(byte[] fenceId, TransactionSystemClient txClient)
- throws TransactionFailureException, InterruptedException, TimeoutException {
- return new DefaultFenceWait(new TransactionContext(txClient, new WriteFence(fenceId)));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/visibility/WriteFence.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/visibility/WriteFence.java b/tephra-core/src/main/java/co/cask/tephra/visibility/WriteFence.java
deleted file mode 100644
index a16264d..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/visibility/WriteFence.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.visibility;
-
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionAware;
-import com.google.common.primitives.Bytes;
-import com.google.common.primitives.Longs;
-import com.google.common.primitives.UnsignedBytes;
-
-import java.util.Collection;
-import java.util.TreeSet;
-
-/**
- * Implementation used by {@link FenceWait} to wait for a {@link VisibilityFence}.
- */
-class WriteFence implements TransactionAware {
- private final byte[] fenceId;
- private Transaction tx;
- private Collection<byte[]> inProgressChanges;
-
- public WriteFence(byte[] fenceId) {
- this.fenceId = fenceId;
- }
-
- @Override
- public void startTx(Transaction tx) {
- this.tx = tx;
- if (inProgressChanges == null) {
- inProgressChanges = new TreeSet<>(UnsignedBytes.lexicographicalComparator());
- for (long inProgressTx : tx.getInProgress()) {
- inProgressChanges.add(Bytes.concat(fenceId, Longs.toByteArray(inProgressTx)));
- }
- }
- }
-
- @Override
- public void updateTx(Transaction tx) {
- // Fences only need original transaction
- }
-
- @Override
- public Collection<byte[]> getTxChanges() {
- if (inProgressChanges == null || tx == null) {
- throw new IllegalStateException("Transaction has not started yet");
- }
- return inProgressChanges;
- }
-
- @Override
- public boolean commitTx() throws Exception {
- // Nothing to persist
- return true;
- }
-
- @Override
- public void postTxCommit() {
- tx = null;
- }
-
- @Override
- public boolean rollbackTx() throws Exception {
- // Nothing to rollback
- return true;
- }
-
- @Override
- public String getTransactionAwareName() {
- return getClass().getSimpleName();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicACLData.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicACLData.java b/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicACLData.java
deleted file mode 100644
index ceaffad..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicACLData.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.zookeeper;
-
-import org.apache.twill.zookeeper.ACLData;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-
-import java.util.List;
-
-/**
- * A straightforward implementation of {@link ACLData}.
- */
-final class BasicACLData implements ACLData {
-
- private final List<ACL> acl;
- private final Stat stat;
-
- BasicACLData(List<ACL> acl, Stat stat) {
- this.acl = acl;
- this.stat = stat;
- }
-
- @Override
- public List<ACL> getACL() {
- return acl;
- }
-
- @Override
- public Stat getStat() {
- return stat;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeChildren.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeChildren.java b/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeChildren.java
deleted file mode 100644
index ce81ade..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeChildren.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.zookeeper;
-
-import com.google.common.base.Objects;
-import org.apache.twill.zookeeper.NodeChildren;
-import org.apache.zookeeper.data.Stat;
-
-import java.util.List;
-
-/**
- * Implementation of the {@link NodeChildren}.
- */
-final class BasicNodeChildren implements NodeChildren {
-
- private final Stat stat;
- private final List<String> children;
-
- BasicNodeChildren(List<String> children, Stat stat) {
- this.stat = stat;
- this.children = children;
- }
-
- @Override
- public Stat getStat() {
- return stat;
- }
-
- @Override
- public List<String> getChildren() {
- return children;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || !(o instanceof NodeChildren)) {
- return false;
- }
-
- NodeChildren that = (NodeChildren) o;
- return stat.equals(that.getStat()) && children.equals(that.getChildren());
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(children, stat);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeData.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeData.java b/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeData.java
deleted file mode 100644
index b09ff4b..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/zookeeper/BasicNodeData.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.zookeeper;
-
-import com.google.common.base.Objects;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.zookeeper.data.Stat;
-
-import java.util.Arrays;
-
-/**
- * A straightforward implementation for {@link NodeData}.
- */
-final class BasicNodeData implements NodeData {
-
- private final byte[] data;
- private final Stat stat;
-
- BasicNodeData(byte[] data, Stat stat) {
- this.data = data;
- this.stat = stat;
- }
-
- @Override
- public Stat getStat() {
- return stat;
- }
-
- @Override
- public byte[] getData() {
- return data;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || !(o instanceof NodeData)) {
- return false;
- }
-
- BasicNodeData that = (BasicNodeData) o;
-
- return stat.equals(that.getStat()) && Arrays.equals(data, that.getData());
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(data, stat);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/zookeeper/TephraZKClientService.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/zookeeper/TephraZKClientService.java b/tephra-core/src/main/java/co/cask/tephra/zookeeper/TephraZKClientService.java
deleted file mode 100644
index 5f6f565..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/zookeeper/TephraZKClientService.java
+++ /dev/null
@@ -1,627 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.zookeeper;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.util.concurrent.AbstractService;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import org.apache.twill.common.Cancellable;
-import org.apache.twill.common.Threads;
-import org.apache.twill.internal.zookeeper.SettableOperationFuture;
-import org.apache.twill.zookeeper.ACLData;
-import org.apache.twill.zookeeper.NodeChildren;
-import org.apache.twill.zookeeper.NodeData;
-import org.apache.twill.zookeeper.OperationFuture;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.annotation.Nullable;
-
-/**
- * The implementation of {@link ZKClientService}.
- */
-public class TephraZKClientService extends AbstractService implements ZKClientService, Watcher {
-
- private static final Logger LOG = LoggerFactory.getLogger(TephraZKClientService.class);
-
- private final String zkStr;
- private final int sessionTimeout;
- private final List<Watcher> connectionWatchers;
- private final Multimap<String, byte[]> authInfos;
- private final AtomicReference<ZooKeeper> zooKeeper;
- private final Runnable stopTask;
- private ExecutorService eventExecutor;
-
- /**
- * Create a new instance.
- * @param zkStr zookeper connection string
- * @param sessionTimeout timeout in milliseconds
- * @param connectionWatcher watcher to set
- * @param authInfos authorization bytes
- */
- public TephraZKClientService(String zkStr, int sessionTimeout,
- Watcher connectionWatcher, Multimap<String, byte[]> authInfos) {
- this.zkStr = zkStr;
- this.sessionTimeout = sessionTimeout;
- this.connectionWatchers = new CopyOnWriteArrayList<>();
- this.authInfos = copyAuthInfo(authInfos);
- addConnectionWatcher(connectionWatcher);
-
- this.zooKeeper = new AtomicReference<>();
- this.stopTask = createStopTask();
- }
-
- @Override
- public Long getSessionId() {
- ZooKeeper zk = zooKeeper.get();
- return zk == null ? null : zk.getSessionId();
- }
-
- @Override
- public String getConnectString() {
- return zkStr;
- }
-
- @Override
- public Cancellable addConnectionWatcher(final Watcher watcher) {
- if (watcher == null) {
- return new Cancellable() {
- @Override
- public void cancel() {
- // No-op
- }
- };
- }
-
- // Invocation of connection watchers are already done inside the event thread,
- // hence no need to wrap the watcher again.
- connectionWatchers.add(watcher);
- return new Cancellable() {
- @Override
- public void cancel() {
- connectionWatchers.remove(watcher);
- }
- };
- }
-
- @Override
- public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) {
- return create(path, data, createMode, true);
- }
-
- @Override
- public OperationFuture<String> create(String path, @Nullable byte[] data,
- CreateMode createMode, boolean createParent) {
- return create(path, data, createMode, createParent, ZooDefs.Ids.OPEN_ACL_UNSAFE);
- }
-
- @Override
- public OperationFuture<String> create(String path, @Nullable byte[] data,
- CreateMode createMode, Iterable<ACL> acl) {
- return create(path, data, createMode, true, acl);
- }
-
- @Override
- public OperationFuture<Stat> exists(String path) {
- return exists(path, null);
- }
-
- @Override
- public OperationFuture<NodeChildren> getChildren(String path) {
- return getChildren(path, null);
- }
-
- @Override
- public OperationFuture<NodeData> getData(String path) {
- return getData(path, null);
- }
-
- @Override
- public OperationFuture<Stat> setData(String path, byte[] data) {
- return setData(path, data, -1);
- }
-
- @Override
- public OperationFuture<String> delete(String path) {
- return delete(path, -1);
- }
-
- @Override
- public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl) {
- return setACL(path, acl, -1);
- }
-
- @Override
- public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode,
- boolean createParent, Iterable<ACL> acl) {
- return doCreate(path, data, createMode, createParent, ImmutableList.copyOf(acl), false);
- }
-
- private OperationFuture<String> doCreate(final String path,
- @Nullable final byte[] data,
- final CreateMode createMode,
- final boolean createParent,
- final List<ACL> acl,
- final boolean ignoreNodeExists) {
- final SettableOperationFuture<String> createFuture = SettableOperationFuture.create(path, eventExecutor);
- getZooKeeper().create(path, data, acl, createMode, Callbacks.STRING, createFuture);
- if (!createParent) {
- return createFuture;
- }
-
- // If create parent is request, return a different future
- final SettableOperationFuture<String> result = SettableOperationFuture.create(path, eventExecutor);
- // Watch for changes in the original future
- Futures.addCallback(createFuture, new FutureCallback<String>() {
- @Override
- public void onSuccess(String path) {
- // Propagate if creation was successful
- result.set(path);
- }
-
- @Override
- public void onFailure(Throwable t) {
- // See if the failure can be handled
- if (updateFailureResult(t, result, path, ignoreNodeExists)) {
- return;
- }
- // Create the parent node
- String parentPath = getParent(path);
- if (parentPath.isEmpty()) {
- result.setException(t);
- return;
- }
- // Watch for parent creation complete. Parent is created with the unsafe ACL.
- Futures.addCallback(doCreate(parentPath, null, CreateMode.PERSISTENT,
- true, ZooDefs.Ids.OPEN_ACL_UNSAFE, true), new FutureCallback<String>() {
- @Override
- public void onSuccess(String parentPath) {
- // Create the requested path again
- Futures.addCallback(
- doCreate(path, data, createMode, false, acl, ignoreNodeExists), new FutureCallback<String>() {
- @Override
- public void onSuccess(String pathResult) {
- result.set(pathResult);
- }
-
- @Override
- public void onFailure(Throwable t) {
- // handle the failure
- updateFailureResult(t, result, path, ignoreNodeExists);
- }
- });
- }
-
- @Override
- public void onFailure(Throwable t) {
- result.setException(t);
- }
- });
- }
-
- /**
- * Updates the result future based on the given {@link Throwable}.
- * @param t Cause of the failure
- * @param result Future to be updated
- * @param path Request path for the operation
- * @return {@code true} if it is a failure, {@code false} otherwise.
- */
- private boolean updateFailureResult(Throwable t, SettableOperationFuture<String> result,
- String path, boolean ignoreNodeExists) {
- // Propagate if there is error
- if (!(t instanceof KeeperException)) {
- result.setException(t);
- return true;
- }
- KeeperException.Code code = ((KeeperException) t).code();
- // Node already exists, simply return success if it allows for ignoring node exists (for parent node creation).
- if (ignoreNodeExists && code == KeeperException.Code.NODEEXISTS) {
- // The requested path could be used because it only applies to non-sequential node
- result.set(path);
- return false;
- }
- if (code != KeeperException.Code.NONODE) {
- result.setException(t);
- return true;
- }
- return false;
- }
-
- /**
- * Gets the parent of the given path.
- * @param path Path for computing its parent
- * @return Parent of the given path, or empty string if the given path is the root path already.
- */
- private String getParent(String path) {
- String parentPath = path.substring(0, path.lastIndexOf('/'));
- return (parentPath.isEmpty() && !"/".equals(path)) ? "/" : parentPath;
- }
- });
-
- return result;
- }
-
- @Override
- public OperationFuture<Stat> exists(String path, Watcher watcher) {
- SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, eventExecutor);
- getZooKeeper().exists(path, wrapWatcher(watcher), Callbacks.STAT_NONODE, result);
- return result;
- }
-
- @Override
- public OperationFuture<NodeChildren> getChildren(String path, Watcher watcher) {
- SettableOperationFuture<NodeChildren> result = SettableOperationFuture.create(path, eventExecutor);
- getZooKeeper().getChildren(path, wrapWatcher(watcher), Callbacks.CHILDREN, result);
- return result;
- }
-
- @Override
- public OperationFuture<NodeData> getData(String path, Watcher watcher) {
- SettableOperationFuture<NodeData> result = SettableOperationFuture.create(path, eventExecutor);
- getZooKeeper().getData(path, wrapWatcher(watcher), Callbacks.DATA, result);
-
- return result;
- }
-
- @Override
- public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
- SettableOperationFuture<Stat> result = SettableOperationFuture.create(dataPath, eventExecutor);
- getZooKeeper().setData(dataPath, data, version, Callbacks.STAT, result);
- return result;
- }
-
- @Override
- public OperationFuture<String> delete(String deletePath, int version) {
- SettableOperationFuture<String> result = SettableOperationFuture.create(deletePath, eventExecutor);
- getZooKeeper().delete(deletePath, version, Callbacks.VOID, result);
- return result;
- }
-
- @Override
- public OperationFuture<ACLData> getACL(String path) {
- SettableOperationFuture<ACLData> result = SettableOperationFuture.create(path, eventExecutor);
- getZooKeeper().getACL(path, new Stat(), Callbacks.ACL, result);
- return result;
- }
-
- @Override
- public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl, int version) {
- SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, eventExecutor);
- getZooKeeper().setACL(path, ImmutableList.copyOf(acl), version, Callbacks.STAT, result);
- return result;
- }
-
- @Override
- public Supplier<ZooKeeper> getZooKeeperSupplier() {
- return new Supplier<ZooKeeper>() {
- @Override
- public ZooKeeper get() {
- return getZooKeeper();
- }
- };
- }
-
- @Override
- protected void doStart() {
- // A single thread executor for all events
- ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>(),
- Threads.createDaemonThreadFactory("zk-client-EventThread"));
- // Just discard the execution if the executor is closed
- executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
- eventExecutor = executor;
-
- try {
- zooKeeper.set(createZooKeeper());
- } catch (IOException e) {
- notifyFailed(e);
- }
- }
-
- @Override
- protected void doStop() {
- // Submit a task to the executor to make sure all pending events in the executor are fired before
- // transiting this Service into STOPPED state
- eventExecutor.submit(stopTask);
- eventExecutor.shutdown();
- }
-
- /**
- * @return Current {@link ZooKeeper} client.
- */
- private ZooKeeper getZooKeeper() {
- ZooKeeper zk = zooKeeper.get();
- Preconditions.checkArgument(zk != null, "Not connected to zooKeeper.");
- return zk;
- }
-
- /**
- * Wraps the given watcher to be called from the event executor.
- * @param watcher Watcher to be wrapped
- * @return The wrapped Watcher
- */
- private Watcher wrapWatcher(final Watcher watcher) {
- if (watcher == null) {
- return null;
- }
- return new Watcher() {
- @Override
- public void process(final WatchedEvent event) {
- if (eventExecutor.isShutdown()) {
- LOG.debug("Already shutdown. Discarding event: {}", event);
- return;
- }
- eventExecutor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- watcher.process(event);
- } catch (Throwable t) {
- LOG.error("Watcher throws exception.", t);
- }
- }
- });
- }
- };
- }
-
- /**
- * Creates a deep copy of the given authInfos multimap.
- */
- private Multimap<String, byte[]> copyAuthInfo(Multimap<String, byte[]> authInfos) {
- Multimap<String, byte[]> result = ArrayListMultimap.create();
-
- for (Map.Entry<String, byte[]> entry : authInfos.entries()) {
- byte[] info = entry.getValue();
- result.put(entry.getKey(), info == null ? null : Arrays.copyOf(info, info.length));
- }
-
- return result;
- }
-
- @Override
- public void process(WatchedEvent event) {
- State state = state();
- if (state == State.TERMINATED || state == State.FAILED) {
- return;
- }
-
- try {
- if (event.getState() == Event.KeeperState.SyncConnected && state == State.STARTING) {
- LOG.debug("Connected to ZooKeeper: {}", zkStr);
- notifyStarted();
- return;
- }
- if (event.getState() == Event.KeeperState.Expired) {
- LOG.info("ZooKeeper session expired: {}", zkStr);
-
- // When connection expired, simply reconnect again
- if (state != State.RUNNING) {
- return;
- }
- eventExecutor.submit(new Runnable() {
- @Override
- public void run() {
- // Only reconnect if the current state is running
- if (state() != State.RUNNING) {
- return;
- }
- try {
- LOG.info("Reconnect to ZooKeeper due to expiration: {}", zkStr);
- closeZooKeeper(zooKeeper.getAndSet(createZooKeeper()));
- } catch (IOException e) {
- notifyFailed(e);
- }
- }
- });
- }
- } finally {
- if (event.getType() == Event.EventType.None) {
- for (Watcher connectionWatcher : connectionWatchers) {
- connectionWatcher.process(event);
- }
- }
- }
- }
-
- /**
- * Creates a new ZooKeeper connection.
- */
- private ZooKeeper createZooKeeper() throws IOException {
- ZooKeeper zk = new ZooKeeper(zkStr, sessionTimeout, wrapWatcher(this));
- for (Map.Entry<String, byte[]> authInfo : authInfos.entries()) {
- zk.addAuthInfo(authInfo.getKey(), authInfo.getValue());
- }
- return zk;
- }
-
- /**
- * Closes the given {@link ZooKeeper} if it is not null. If there is InterruptedException,
- * it will get logged.
- */
- private void closeZooKeeper(@Nullable ZooKeeper zk) {
- try {
- if (zk != null) {
- zk.close();
- }
- } catch (InterruptedException e) {
- LOG.warn("Interrupted when closing ZooKeeper", e);
- Thread.currentThread().interrupt();
- }
- }
-
- /**
- * Creates a {@link Runnable} task that will get executed in the event executor for transiting this
- * Service into STOPPED state.
- */
- private Runnable createStopTask() {
- return new Runnable() {
- @Override
- public void run() {
- try {
- // Close the ZK connection in this task will make sure if there is ZK connection created
- // after doStop() was called but before this task has been executed is also closed.
- // It is possible to happen when the following sequence happens:
- //
- // 1. session expired, hence the expired event is triggered
- // 2. The reconnect task executed. With Service.state() == RUNNING, it creates a new ZK client
- // 3. Service.stop() gets called, Service.state() changed to STOPPING
- // 4. The new ZK client created from the reconnect thread update the zooKeeper with the new one
- closeZooKeeper(zooKeeper.getAndSet(null));
- notifyStopped();
- } catch (Exception e) {
- notifyFailed(e);
- }
- }
- };
- }
-
- /**
- * Collection of generic callbacks that simply reflect results into OperationFuture.
- */
- private static final class Callbacks {
- static final AsyncCallback.StringCallback STRING = new AsyncCallback.StringCallback() {
- @Override
- @SuppressWarnings("unchecked")
- public void processResult(int rc, String path, Object ctx, String name) {
- SettableOperationFuture<String> result = (SettableOperationFuture<String>) ctx;
- KeeperException.Code code = KeeperException.Code.get(rc);
- if (code == KeeperException.Code.OK) {
- result.set((name == null || name.isEmpty()) ? path : name);
- return;
- }
- result.setException(KeeperException.create(code, result.getRequestPath()));
- }
- };
-
- static final AsyncCallback.StatCallback STAT = new AsyncCallback.StatCallback() {
- @Override
- @SuppressWarnings("unchecked")
- public void processResult(int rc, String path, Object ctx, Stat stat) {
- SettableOperationFuture<Stat> result = (SettableOperationFuture<Stat>) ctx;
- KeeperException.Code code = KeeperException.Code.get(rc);
- if (code == KeeperException.Code.OK) {
- result.set(stat);
- return;
- }
- result.setException(KeeperException.create(code, result.getRequestPath()));
- }
- };
-
- /**
- * A stat callback that treats NONODE as success.
- */
- static final AsyncCallback.StatCallback STAT_NONODE = new AsyncCallback.StatCallback() {
- @Override
- @SuppressWarnings("unchecked")
- public void processResult(int rc, String path, Object ctx, Stat stat) {
- SettableOperationFuture<Stat> result = (SettableOperationFuture<Stat>) ctx;
- KeeperException.Code code = KeeperException.Code.get(rc);
- if (code == KeeperException.Code.OK || code == KeeperException.Code.NONODE) {
- result.set(stat);
- return;
- }
- result.setException(KeeperException.create(code, result.getRequestPath()));
- }
- };
-
- static final AsyncCallback.Children2Callback CHILDREN = new AsyncCallback.Children2Callback() {
- @Override
- @SuppressWarnings("unchecked")
- public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
- SettableOperationFuture<NodeChildren> result = (SettableOperationFuture<NodeChildren>) ctx;
- KeeperException.Code code = KeeperException.Code.get(rc);
- if (code == KeeperException.Code.OK) {
- result.set(new BasicNodeChildren(children, stat));
- return;
- }
- result.setException(KeeperException.create(code, result.getRequestPath()));
- }
- };
-
- static final AsyncCallback.DataCallback DATA = new AsyncCallback.DataCallback() {
- @Override
- @SuppressWarnings("unchecked")
- public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
- SettableOperationFuture<NodeData> result = (SettableOperationFuture<NodeData>) ctx;
- KeeperException.Code code = KeeperException.Code.get(rc);
- if (code == KeeperException.Code.OK) {
- result.set(new BasicNodeData(data, stat));
- return;
- }
- result.setException(KeeperException.create(code, result.getRequestPath()));
- }
- };
-
- static final AsyncCallback.VoidCallback VOID = new AsyncCallback.VoidCallback() {
- @Override
- @SuppressWarnings("unchecked")
- public void processResult(int rc, String path, Object ctx) {
- SettableOperationFuture<String> result = (SettableOperationFuture<String>) ctx;
- KeeperException.Code code = KeeperException.Code.get(rc);
- if (code == KeeperException.Code.OK) {
- result.set(result.getRequestPath());
- return;
- }
- // Otherwise, it is an error
- result.setException(KeeperException.create(code, result.getRequestPath()));
- }
- };
-
- static final AsyncCallback.ACLCallback ACL = new AsyncCallback.ACLCallback() {
- @Override
- @SuppressWarnings("unchecked")
- public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) {
- SettableOperationFuture<ACLData> result = (SettableOperationFuture<ACLData>) ctx;
- KeeperException.Code code = KeeperException.Code.get(rc);
- if (code == KeeperException.Code.OK) {
- result.set(new BasicACLData(acl, stat));
- return;
- }
- result.setException(KeeperException.create(code, result.getRequestPath()));
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionAwareTable.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionAwareTable.java b/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionAwareTable.java
new file mode 100644
index 0000000..64fd7ed
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionAwareTable.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Objects;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Bytes;
+import com.google.common.primitives.UnsignedBytes;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Base class for all the common parts of the HBase version-specific {@code TransactionAwareHTable}
+ * implementations.
+ */
+public abstract class AbstractTransactionAwareTable implements TransactionAware {
+ protected final TransactionCodec txCodec;
+ // map of write pointers to change set associated with each
+ protected final Map<Long, Set<ActionChange>> changeSets;
+ protected final TxConstants.ConflictDetection conflictLevel;
+ protected Transaction tx;
+ protected boolean allowNonTransactional;
+
+ public AbstractTransactionAwareTable(TxConstants.ConflictDetection conflictLevel, boolean allowNonTransactional) {
+ this.conflictLevel = conflictLevel;
+ this.allowNonTransactional = allowNonTransactional;
+ this.txCodec = new TransactionCodec();
+ this.changeSets = Maps.newHashMap();
+ }
+
+ /**
+ * True if the instance allows non-transaction operations.
+ * @return
+ */
+ public boolean getAllowNonTransactional() {
+ return this.allowNonTransactional;
+ }
+
+ /**
+ * Set whether the instance allows non-transactional operations.
+ * @param allowNonTransactional
+ */
+ public void setAllowNonTransactional(boolean allowNonTransactional) {
+ this.allowNonTransactional = allowNonTransactional;
+ }
+
+ @Override
+ public void startTx(Transaction tx) {
+ this.tx = tx;
+ }
+
+ @Override
+ public void updateTx(Transaction tx) {
+ this.tx = tx;
+ }
+
+ @Override
+ public Collection<byte[]> getTxChanges() {
+ if (conflictLevel == TxConstants.ConflictDetection.NONE) {
+ return Collections.emptyList();
+ }
+
+ Collection<byte[]> txChanges = new TreeSet<byte[]>(UnsignedBytes.lexicographicalComparator());
+ for (Set<ActionChange> changeSet : changeSets.values()) {
+ for (ActionChange change : changeSet) {
+ txChanges.add(getChangeKey(change.getRow(), change.getFamily(), change.getQualifier()));
+ }
+ }
+ return txChanges;
+ }
+
+ public byte[] getChangeKey(byte[] row, byte[] family, byte[] qualifier) {
+ byte[] key;
+ switch (conflictLevel) {
+ case ROW:
+ key = Bytes.concat(getTableKey(), row);
+ break;
+ case COLUMN:
+ key = Bytes.concat(getTableKey(), row, family, qualifier);
+ break;
+ case NONE:
+ throw new IllegalStateException("NONE conflict detection does not support change keys");
+ default:
+ throw new IllegalStateException("Unknown conflict detection level: " + conflictLevel);
+ }
+ return key;
+ }
+
+ @Override
+ public boolean commitTx() throws Exception {
+ return doCommit();
+ }
+
+ /**
+ * Commits any pending writes by flushing the wrapped {@code HTable} instance.
+ */
+ protected abstract boolean doCommit() throws IOException;
+
+ @Override
+ public void postTxCommit() {
+ tx = null;
+ changeSets.clear();
+ }
+
+ @Override
+ public String getTransactionAwareName() {
+ return new String(getTableKey(), Charsets.UTF_8);
+ }
+
+ /**
+ * Returns the table name to use as a key prefix for the transaction change set.
+ */
+ protected abstract byte[] getTableKey();
+
+ @Override
+ public boolean rollbackTx() throws Exception {
+ return doRollback();
+ }
+
+ /**
+ * Rolls back any persisted changes from the transaction by issuing offsetting deletes to the
+ * wrapped {@code HTable} instance. How this is handled will depend on the delete API exposed
+ * by the specific version of HBase.
+ */
+ protected abstract boolean doRollback() throws Exception;
+
+ protected void addToChangeSet(byte[] row, byte[] family, byte[] qualifier) {
+ long currentWritePointer = tx.getWritePointer();
+ Set<ActionChange> changeSet = changeSets.get(currentWritePointer);
+ if (changeSet == null) {
+ changeSet = Sets.newHashSet();
+ changeSets.put(currentWritePointer, changeSet);
+ }
+ switch (conflictLevel) {
+ case ROW:
+ case NONE:
+ // with ROW or NONE conflict detection, we still need to track changes per-family, since this
+ // is the granularity at which we will issue deletes for rollback
+ changeSet.add(new ActionChange(row, family));
+ break;
+ case COLUMN:
+ changeSet.add(new ActionChange(row, family, qualifier));
+ break;
+ default:
+ throw new IllegalStateException("Unknown conflict detection level: " + conflictLevel);
+ }
+ }
+
+ /**
+ * Record of each transaction that causes a change. This reference is used to rollback
+ * any operation upon failure.
+ */
+ protected class ActionChange {
+ private final byte[] row;
+ private final byte[] family;
+ private final byte[] qualifier;
+
+ public ActionChange(byte[] row, byte[] family) {
+ this(row, family, null);
+ }
+
+ public ActionChange(byte[] row, byte[] family, byte[] qualifier) {
+ this.row = row;
+ this.family = family;
+ this.qualifier = qualifier;
+ }
+
+ public byte[] getRow() {
+ return row;
+ }
+
+ public byte[] getFamily() {
+ return family;
+ }
+
+ public byte[] getQualifier() {
+ return qualifier;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || o.getClass() != this.getClass()) {
+ return false;
+ }
+
+ if (o == this) {
+ return true;
+ }
+
+ ActionChange other = (ActionChange) o;
+ return Objects.equal(this.row, other.row) &&
+ Objects.equal(this.family, other.family) &&
+ Objects.equal(this.qualifier, other.qualifier);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Arrays.hashCode(row);
+ result = 31 * result + (family != null ? Arrays.hashCode(family) : 0);
+ result = 31 * result + (qualifier != null ? Arrays.hashCode(qualifier) : 0);
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionExecutor.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionExecutor.java b/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionExecutor.java
new file mode 100644
index 0000000..528085f
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/AbstractTransactionExecutor.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Provides implementation of asynchronous methods of {@link TransactionExecutor} by delegating their execution
+ * to respective synchronous methods via provided {@link ExecutorService}.
+ */
+public abstract class AbstractTransactionExecutor implements TransactionExecutor {
+ private final ListeningExecutorService executorService;
+
+ protected AbstractTransactionExecutor(ExecutorService executorService) {
+ this.executorService = MoreExecutors.listeningDecorator(executorService);
+ }
+
+ @Override
+ public <I, O> O executeUnchecked(Function<I, O> function, I input) {
+ try {
+ return execute(function, input);
+ } catch (TransactionFailureException e) {
+ throw Throwables.propagate(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public <I> void executeUnchecked(Procedure<I> procedure, I input) {
+ try {
+ execute(procedure, input);
+ } catch (TransactionFailureException e) {
+ throw Throwables.propagate(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public <O> O executeUnchecked(Callable<O> callable) {
+ try {
+ return execute(callable);
+ } catch (TransactionFailureException e) {
+ throw Throwables.propagate(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public void executeUnchecked(Subroutine subroutine) {
+ try {
+ execute(subroutine);
+ } catch (TransactionFailureException e) {
+ throw Throwables.propagate(e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw Throwables.propagate(e);
+ }
+ }
+
+ @Override
+ public <I, O> ListenableFuture<O> submit(final Function<I, O> function, final I input) {
+ return executorService.submit(new Callable<O>() {
+ @Override
+ public O call() throws Exception {
+ return execute(function, input);
+ }
+ });
+ }
+
+ @Override
+ public <I> ListenableFuture<?> submit(final Procedure<I> procedure, final I input) {
+ return executorService.submit(new Callable<Object>() {
+ @Override
+ public I call() throws Exception {
+ execute(procedure, input);
+ return null;
+ }
+ });
+ }
+
+ @Override
+ public <O> ListenableFuture<O> submit(final Callable<O> callable) {
+ return executorService.submit(new Callable<O>() {
+ @Override
+ public O call() throws Exception {
+ return execute(callable);
+ }
+ });
+ }
+
+ @Override
+ public ListenableFuture<?> submit(final Subroutine subroutine) {
+ return executorService.submit(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ execute(subroutine);
+ return null;
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/ChangeId.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/ChangeId.java b/tephra-core/src/main/java/org/apache/tephra/ChangeId.java
new file mode 100644
index 0000000..0eb7191
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/ChangeId.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import java.util.Arrays;
+
+/**
+ * Represents a row key from a data set changed as part of a transaction.
+ */
+public final class ChangeId {
+ private final byte[] key;
+ private final int hash;
+
+ public ChangeId(byte[] bytes) {
+ key = bytes;
+ hash = Arrays.hashCode(bytes);
+ }
+
+ public byte[] getKey() {
+ return key;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (o == null || o.getClass() != ChangeId.class) {
+ return false;
+ }
+ ChangeId other = (ChangeId) o;
+ return hash == other.hash && Arrays.equals(key, other.key);
+ }
+
+ @Override
+ public int hashCode() {
+ return hash;
+ }
+
+ @Override
+ public String toString() {
+ return toStringBinary(key, 0, key.length);
+ }
+
+ // Copy from Bytes.toStringBinary so that we don't need direct dependencies on Bytes.
+ private String toStringBinary(byte [] b, int off, int len) {
+ StringBuilder result = new StringBuilder();
+ for (int i = off; i < off + len; ++i) {
+ int ch = b[i] & 0xFF;
+ if ((ch >= '0' && ch <= '9')
+ || (ch >= 'A' && ch <= 'Z')
+ || (ch >= 'a' && ch <= 'z')
+ || " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0) {
+ result.append((char) ch);
+ } else {
+ result.append(String.format("\\x%02X", ch));
+ }
+ }
+ return result.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/DefaultTransactionExecutor.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/DefaultTransactionExecutor.java b/tephra-core/src/main/java/org/apache/tephra/DefaultTransactionExecutor.java
new file mode 100644
index 0000000..c5e1cb3
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/DefaultTransactionExecutor.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Utility class that encapsulates the transaction life cycle over a given set of
+ * transaction-aware datasets. The executor can be reused across multiple invocations
+ * of the execute() method. However, it is not thread-safe for concurrent execution.
+ * <p>
+ * Transaction execution will be retries according to specified in constructor {@link RetryStrategy}.
+ * By default {@link RetryOnConflictStrategy} is used with max 20 retries and 100 ms between retries.
+ * </p>
+ */
+public class DefaultTransactionExecutor extends AbstractTransactionExecutor {
+
+ private final Collection<TransactionAware> txAwares;
+ private final TransactionSystemClient txClient;
+ private final RetryStrategy retryStrategy;
+
+ /**
+ * Convenience constructor, has same affect as {@link #DefaultTransactionExecutor(TransactionSystemClient, Iterable)}
+ */
+ public DefaultTransactionExecutor(TransactionSystemClient txClient, TransactionAware... txAwares) {
+ this(txClient, Arrays.asList(txAwares));
+ }
+
+
+ public DefaultTransactionExecutor(TransactionSystemClient txClient,
+ Iterable<TransactionAware> txAwares,
+ RetryStrategy retryStrategy) {
+
+ super(MoreExecutors.sameThreadExecutor());
+ this.txAwares = ImmutableList.copyOf(txAwares);
+ this.txClient = txClient;
+ this.retryStrategy = retryStrategy;
+ }
+
+ /**
+ * Constructor for a transaction executor.
+ */
+ @Inject
+ public DefaultTransactionExecutor(TransactionSystemClient txClient, @Assisted Iterable<TransactionAware> txAwares) {
+ this(txClient, txAwares, RetryStrategies.retryOnConflict(20, 100));
+ }
+
+ @Override
+ public <I, O> O execute(Function<I, O> function, I input) throws TransactionFailureException, InterruptedException {
+ return executeWithRetry(function, input);
+ }
+
+ @Override
+ public <I> void execute(final Procedure<I> procedure, I input)
+ throws TransactionFailureException, InterruptedException {
+
+ execute(new Function<I, Void>() {
+ @Override
+ public Void apply(I input) throws Exception {
+ procedure.apply(input);
+ return null;
+ }
+ }, input);
+ }
+
+ @Override
+ public <O> O execute(final Callable<O> callable) throws TransactionFailureException, InterruptedException {
+ return execute(new Function<Void, O>() {
+ @Override
+ public O apply(Void input) throws Exception {
+ return callable.call();
+ }
+ }, null);
+ }
+
+ @Override
+ public void execute(final Subroutine subroutine) throws TransactionFailureException, InterruptedException {
+ execute(new Function<Void, Void>() {
+ @Override
+ public Void apply(Void input) throws Exception {
+ subroutine.apply();
+ return null;
+ }
+ }, null);
+ }
+
+ private <I, O> O executeWithRetry(Function<I, O> function, I input)
+ throws TransactionFailureException, InterruptedException {
+
+ int retries = 0;
+ while (true) {
+ try {
+ return executeOnce(function, input);
+ } catch (TransactionFailureException e) {
+ long delay = retryStrategy.nextRetry(e, ++retries);
+
+ if (delay < 0) {
+ throw e;
+ }
+
+ if (delay > 0) {
+ TimeUnit.MILLISECONDS.sleep(delay);
+ }
+ }
+ }
+
+ }
+
+ private <I, O> O executeOnce(Function<I, O> function, I input) throws TransactionFailureException {
+ TransactionContext txContext = new TransactionContext(txClient, txAwares);
+ txContext.start();
+ O o = null;
+ try {
+ o = function.apply(input);
+ } catch (Throwable e) {
+ txContext.abort(new TransactionFailureException("Transaction function failure for transaction. ", e));
+ // abort will throw
+ }
+ // will throw if smth goes wrong
+ txContext.finish();
+ return o;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/InvalidTruncateTimeException.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/InvalidTruncateTimeException.java b/tephra-core/src/main/java/org/apache/tephra/InvalidTruncateTimeException.java
new file mode 100644
index 0000000..1cdbfb0
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/InvalidTruncateTimeException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+/**
+ * Thrown when truncate invalid list is called with a time, and when there are in-progress transactions that
+ * were started before the given time.
+ */
+public class InvalidTruncateTimeException extends Exception {
+ public InvalidTruncateTimeException(String s) {
+ super(s);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/NoRetryStrategy.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/NoRetryStrategy.java b/tephra-core/src/main/java/org/apache/tephra/NoRetryStrategy.java
new file mode 100644
index 0000000..4dc245d
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/NoRetryStrategy.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+/**
+ * Does no retries
+ */
+public class NoRetryStrategy implements RetryStrategy {
+ public static final RetryStrategy INSTANCE = new NoRetryStrategy();
+
+ private NoRetryStrategy() {}
+
+ @Override
+ public long nextRetry(TransactionFailureException reason, int failureCount) {
+ return -1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/RetryOnConflictStrategy.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/RetryOnConflictStrategy.java b/tephra-core/src/main/java/org/apache/tephra/RetryOnConflictStrategy.java
new file mode 100644
index 0000000..82f069f
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/RetryOnConflictStrategy.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+/**
+ * Retries transaction execution when transaction fails with {@link TransactionConflictException}.
+ */
+public class RetryOnConflictStrategy implements RetryStrategy {
+ private final int maxRetries;
+ private final long retryDelay;
+
+ public RetryOnConflictStrategy(int maxRetries, long retryDelay) {
+ this.maxRetries = maxRetries;
+ this.retryDelay = retryDelay;
+ }
+
+ @Override
+ public long nextRetry(TransactionFailureException reason, int failureCount) {
+ if (reason instanceof TransactionConflictException) {
+ return failureCount > maxRetries ? -1 : retryDelay;
+ } else {
+ return -1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/RetryStrategies.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/RetryStrategies.java b/tephra-core/src/main/java/org/apache/tephra/RetryStrategies.java
new file mode 100644
index 0000000..9550d77
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/RetryStrategies.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra;
+
+/**
+ * Collection of {@link RetryStrategy}s.
+ */
+public final class RetryStrategies {
+ private RetryStrategies() {}
+
+ /**
+ * @param maxRetries max number of retries
+ * @param delayInMs delay between retries in milliseconds
+ * @return RetryStrategy that retries transaction execution when transaction fails with
+ * {@link TransactionConflictException}
+ */
+ public static RetryStrategy retryOnConflict(int maxRetries, long delayInMs) {
+ return new RetryOnConflictStrategy(maxRetries, delayInMs);
+ }
+
+ public static RetryStrategy noRetries() {
+ return NoRetryStrategy.INSTANCE;
+ }
+}