You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/06 23:02:37 UTC
[31/51] [partial] incubator-tephra git commit: Rename package to
org.apache.tephra
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersionSpecificFactory.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersionSpecificFactory.java b/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersionSpecificFactory.java
new file mode 100644
index 0000000..ebbbd18
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersionSpecificFactory.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.util;
+
+import com.google.inject.Provider;
+import com.google.inject.ProvisionException;
+import org.apache.twill.internal.utils.Instances;
+
+/**
+ * Common class factory behavior for classes which need specific implementations depending on HBase versions.
+ * Specific factories can subclass this class and simply plug in the class names for their implementations.
+ *
+ * @param <T> Version specific class provided by this factory.
+ */
+public abstract class HBaseVersionSpecificFactory<T> implements Provider<T> {
+ @Override
+ public T get() {
+ T instance = null;
+ try {
+ switch (HBaseVersion.get()) {
+ case HBASE_94:
+ throw new ProvisionException("HBase 0.94 is no longer supported. Please upgrade to HBase 0.96 or newer.");
+ case HBASE_96:
+ instance = createInstance(getHBase96Classname());
+ break;
+ case HBASE_98:
+ instance = createInstance(getHBase98Classname());
+ break;
+ case HBASE_10:
+ instance = createInstance(getHBase10Classname());
+ break;
+ case HBASE_10_CDH:
+ instance = createInstance(getHBase10CDHClassname());
+ break;
+ case HBASE_11:
+ case HBASE_12_CDH:
+ instance = createInstance(getHBase11Classname());
+ break;
+ case UNKNOWN:
+ throw new ProvisionException("Unknown HBase version: " + HBaseVersion.getVersionString());
+ }
+ } catch (ClassNotFoundException cnfe) {
+ throw new ProvisionException(cnfe.getMessage(), cnfe);
+ }
+ return instance;
+ }
+
+ protected T createInstance(String className) throws ClassNotFoundException {
+ Class clz = Class.forName(className);
+ return (T) Instances.newInstance(clz);
+ }
+
+ protected abstract String getHBase96Classname();
+ protected abstract String getHBase98Classname();
+ protected abstract String getHBase10Classname();
+ protected abstract String getHBase10CDHClassname();
+ protected abstract String getHBase11Classname();
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
new file mode 100644
index 0000000..08b1545
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.util;
+
+import com.google.common.primitives.Longs;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.persist.TransactionVisibilityState;
+
+import java.util.Map;
+
+/**
+ * Utility methods supporting transaction operations.
+ */
+public class TxUtils {
+
+ // Any cell with timestamp less than MAX_NON_TX_TIMESTAMP is assumed to be pre-existing data,
+ // i.e. data written before table was converted into transactional table using Tephra.
+ // Using 1.1 times current time to determine whether a timestamp is transactional timestamp or not is safe, and does
+ // not produce any false positives or false negatives.
+ //
+ // To prove this, let's say the earliest transactional timestamp written by Tephra was in year 2000, and the oldest
+ // that will be written is in year 2200.
+ // 01-Jan-2000 GMT is 946684800000 milliseconds since epoch.
+ // 31-Dec-2200 GMT is 7289654399000 milliseconds since epoch.
+ //
+ // Let's say that we enabled transactions on a table on 01-Jan-2000, then 1.1 * 946684800000 = 31-Dec-2002. Using
+ // 31-Dec-2002, we can safely say from 01-Jan-2000 onwards, whether a cell timestamp is
+ // non-transactional (<= 946684800000).
+ // Note that transactional timestamp will be greater than 946684800000000000 (> year 31969) at this instant.
+ //
+ // On the other end, let's say we enabled transactions on a table on 31-Dec-2200,
+ // then 1.1 * 7289654399000 = 07-Feb-2224. Again, we can use this time from 31-Dec-2200 onwards to say whether a
+ // cell timestamp is transactional (<= 7289654399000).
+ // Note that transactional timestamp will be greater than 7289654399000000000 (> year 232969) at this instant.
+ private static final long MAX_NON_TX_TIMESTAMP = (long) (System.currentTimeMillis() * 1.1);
+
+ /**
+ * Returns the oldest visible timestamp for the given transaction, based on the TTLs configured for each column
+ * family. If no TTL is set on any column family, the oldest visible timestamp will be {@code 0}.
+ * @param ttlByFamily A map of column family name to TTL value (in milliseconds)
+ * @param tx The current transaction
+ * @return The oldest timestamp that will be visible for the given transaction and TTL configuration
+ */
+ public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Transaction tx) {
+ long maxTTL = getMaxTTL(ttlByFamily);
+ // we know that data will not be cleaned up while this tx is running up to this point as janitor uses it
+ return maxTTL < Long.MAX_VALUE ? tx.getVisibilityUpperBound() - maxTTL * TxConstants.MAX_TX_PER_MS : 0;
+ }
+
+ /**
+ * Returns the oldest visible timestamp for the given transaction, based on the TTLs configured for each column
+ * family. If no TTL is set on any column family, the oldest visible timestamp will be {@code 0}.
+ * @param ttlByFamily A map of column family name to TTL value (in milliseconds)
+ * @param tx The current transaction
+ * @param readNonTxnData indicates that the timestamp returned should allow reading non-transactional data
+ * @return The oldest timestamp that will be visible for the given transaction and TTL configuration
+ */
+ public static long getOldestVisibleTimestamp(Map<byte[], Long> ttlByFamily, Transaction tx, boolean readNonTxnData) {
+ if (readNonTxnData) {
+ long maxTTL = getMaxTTL(ttlByFamily);
+ return maxTTL < Long.MAX_VALUE ? System.currentTimeMillis() - maxTTL : 0;
+ }
+
+ return getOldestVisibleTimestamp(ttlByFamily, tx);
+ }
+
+ /**
+ * Returns the maximum timestamp to use for time-range operations, based on the given transaction.
+ * @param tx The current transaction
+ * @return The maximum timestamp (exclusive) to use for time-range operations
+ */
+ public static long getMaxVisibleTimestamp(Transaction tx) {
+ // NOTE: +1 here because we want read up to writepointer inclusive, but timerange's end is exclusive
+ // however, we also need to guard against overflow in the case write pointer is set to MAX_VALUE
+ return tx.getWritePointer() < Long.MAX_VALUE ?
+ tx.getWritePointer() + 1 : tx.getWritePointer();
+ }
+
+ /**
+ * Creates a "dummy" transaction based on the given txVisibilityState's state. This is not a "real" transaction in
+ * the sense that it has not been started, data should not be written with it, and it cannot be committed. However,
+ * this can still be useful for filtering data according to the txVisibilityState's state. Instead of the actual
+ * write pointer from the txVisibilityState, however, we use {@code Long.MAX_VALUE} to avoid mis-identifying any cells
+ * as being written by this transaction (and therefore visible).
+ */
+ public static Transaction createDummyTransaction(TransactionVisibilityState txVisibilityState) {
+ return new Transaction(txVisibilityState.getReadPointer(), Long.MAX_VALUE,
+ Longs.toArray(txVisibilityState.getInvalid()),
+ Longs.toArray(txVisibilityState.getInProgress().keySet()),
+ TxUtils.getFirstShortInProgress(txVisibilityState.getInProgress()), TransactionType.SHORT);
+ }
+
+ /**
+ * Returns the write pointer for the first "short" transaction that in the in-progress set, or
+ * {@link Transaction#NO_TX_IN_PROGRESS} if none.
+ */
+ public static long getFirstShortInProgress(Map<Long, TransactionManager.InProgressTx> inProgress) {
+ long firstShort = Transaction.NO_TX_IN_PROGRESS;
+ for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) {
+ if (!entry.getValue().isLongRunning()) {
+ firstShort = entry.getKey();
+ break;
+ }
+ }
+ return firstShort;
+ }
+
+ /**
+ * Returns the timestamp for calculating time to live for the given cell timestamp.
+ * This takes into account pre-existing non-transactional cells while calculating the time.
+ */
+ public static long getTimestampForTTL(long cellTs) {
+ return isPreExistingVersion(cellTs) ? cellTs * TxConstants.MAX_TX_PER_MS : cellTs;
+ }
+
+ /**
+ * Returns the max TTL for the given TTL values. Returns Long.MAX_VALUE if any of the column families has no TTL set.
+ */
+ private static long getMaxTTL(Map<byte[], Long> ttlByFamily) {
+ long maxTTL = 0;
+ for (Long familyTTL : ttlByFamily.values()) {
+ maxTTL = Math.max(familyTTL <= 0 ? Long.MAX_VALUE : familyTTL, maxTTL);
+ }
+ return maxTTL == 0 ? Long.MAX_VALUE : maxTTL;
+ }
+
+ /**
+ * Returns true if version was written before table was converted into transactional table, false otherwise.
+ */
+ public static boolean isPreExistingVersion(long version) {
+ return version < MAX_NON_TX_TIMESTAMP;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/visibility/DefaultFenceWait.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/visibility/DefaultFenceWait.java b/tephra-core/src/main/java/org/apache/tephra/visibility/DefaultFenceWait.java
new file mode 100644
index 0000000..7c1af8e
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/visibility/DefaultFenceWait.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.visibility;
+
+import com.google.common.base.Stopwatch;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionFailureException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Default implementation of {@link FenceWait}.
+ */
+public class DefaultFenceWait implements FenceWait {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultFenceWait.class);
+
+ private final TransactionContext txContext;
+
+ DefaultFenceWait(TransactionContext txContext) {
+ this.txContext = txContext;
+ }
+
+ @Override
+ public void await(long timeout, TimeUnit timeUnit)
+ throws TransactionFailureException, InterruptedException, TimeoutException {
+ Stopwatch stopwatch = new Stopwatch();
+ stopwatch.start();
+ long sleepTimeMicros = timeUnit.toMicros(timeout) / 10;
+ // Have sleep time to be within 1 microsecond and 500 milliseconds
+ sleepTimeMicros = Math.max(Math.min(sleepTimeMicros, 500 * 1000), 1);
+ while (stopwatch.elapsedTime(timeUnit) < timeout) {
+ txContext.start();
+ try {
+ txContext.finish();
+ return;
+ } catch (TransactionFailureException e) {
+ LOG.error("Got exception waiting for fence. Sleeping for {} microseconds", sleepTimeMicros, e);
+ txContext.abort();
+ TimeUnit.MICROSECONDS.sleep(sleepTimeMicros);
+ }
+ }
+ throw new TimeoutException("Timeout waiting for fence");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/visibility/FenceWait.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/visibility/FenceWait.java b/tephra-core/src/main/java/org/apache/tephra/visibility/FenceWait.java
new file mode 100644
index 0000000..e1fb246
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/visibility/FenceWait.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.visibility;
+
+import org.apache.tephra.TransactionFailureException;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Used by a writer to wait on a fence so that changes are visible to all readers with in-progress transactions.
+ */
+public interface FenceWait {
+ /**
+ * Waits until the fence is complete, or till the timeout specified. The fence wait transaction will get re-tried
+ * several times until the timeout.
+ * <p>
+ *
+ * If a fence wait times out then it means there are still some readers with in-progress transactions that have not
+ * seen the change. In this case the wait will have to be retried using the same FenceWait object.
+ *
+ * @param timeout Maximum time to wait
+ * @param timeUnit {@link TimeUnit} for timeout and sleepTime
+ * @throws TransactionFailureException when not able to start fence wait transaction
+ * @throws InterruptedException on any interrupt
+ * @throws TimeoutException when timeout is reached
+ */
+ void await(long timeout, TimeUnit timeUnit)
+ throws TransactionFailureException, InterruptedException, TimeoutException;
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/visibility/ReadFence.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/visibility/ReadFence.java b/tephra-core/src/main/java/org/apache/tephra/visibility/ReadFence.java
new file mode 100644
index 0000000..a156d55
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/visibility/ReadFence.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.visibility;
+
+import com.google.common.primitives.Bytes;
+import com.google.common.primitives.Longs;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionAware;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Implementation of {@link VisibilityFence} used by reader.
+ */
+class ReadFence implements TransactionAware {
+ private final byte[] fenceId;
+ private Transaction tx;
+
+ public ReadFence(byte[] fenceId) {
+ this.fenceId = fenceId;
+ }
+
+ @Override
+ public void startTx(Transaction tx) {
+ this.tx = tx;
+ }
+
+ @Override
+ public void updateTx(Transaction tx) {
+ // Fences only need original transaction
+ }
+
+ @Override
+ public Collection<byte[]> getTxChanges() {
+ if (tx == null) {
+ throw new IllegalStateException("Transaction has not started yet");
+ }
+ return Collections.singleton(Bytes.concat(fenceId, Longs.toByteArray(tx.getTransactionId())));
+ }
+
+ @Override
+ public boolean commitTx() throws Exception {
+ // Nothing to persist
+ return true;
+ }
+
+ @Override
+ public void postTxCommit() {
+ tx = null;
+ }
+
+ @Override
+ public boolean rollbackTx() throws Exception {
+ // Nothing to rollback
+ return true;
+ }
+
+ @Override
+ public String getTransactionAwareName() {
+ return getClass().getSimpleName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/visibility/VisibilityFence.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/visibility/VisibilityFence.java b/tephra-core/src/main/java/org/apache/tephra/visibility/VisibilityFence.java
new file mode 100644
index 0000000..5d08246
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/visibility/VisibilityFence.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.visibility;
+
+import org.apache.tephra.TransactionAware;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionFailureException;
+import org.apache.tephra.TransactionSystemClient;
+
+import java.util.concurrent.TimeoutException;
+
+/**
+ * VisibilityFence is used to ensure that after a given point in time, all readers see an updated change
+ * that got committed.
+ * <p>
+ *
+ * Typically a reader will never conflict with a writer, since a reader only sees committed changes when its
+ * transaction started. However to ensure that after a given point all readers are aware of a change,
+ * we have to introduce a conflict between a reader and a writer that act on the same data concurrently.
+ *<p>
+ *
+ * This is done by the reader indicating that it is interested in changes to a piece of data by using a fence
+ * in its transaction. If there are no changes to the data when reader tries to commit the transaction
+ * containing the fence, the commit succeeds.
+ * <p>
+ *
+ * On the other hand, a writer updates the same data in a transaction. After the write transaction is committed,
+ * the writer then waits on the fence to ensure that all in-progress readers are aware of this update.
+ * When the wait on the fence returns successfully, it means that
+ * any in-progress readers that have not seen the change will not be allowed to commit anymore. This will
+ * force the readers to start a new transaction, and this ensures that the changes made by writer are visible
+ * to the readers.
+ *<p>
+ *
+ * In case an in-progress reader commits when the writer is waiting on the fence, then the wait method will retry
+ * until the given timeout.
+ * <p>
+ *
+ * Hence a successful await on a fence ensures that any reader (using the same fence) that successfully commits after
+ * this point onwards would see the change.
+ *
+ * <p>
+ * Sample reader code:
+ * <pre>
+ * <code>
+ * TransactionAware fence = VisibilityFence.create(fenceId);
+ * TransactionContext readTxContext = new TransactionContext(txClient, fence, table1, table2, ...);
+ * readTxContext.start();
+ *
+ * // do operations using table1, table2, etc.
+ *
+ * // finally commit
+ * try {
+ * readTxContext.finish();
+ * } catch (TransactionConflictException e) {
+ * // handle conflict by aborting and starting over with a new transaction
+ * }
+ * </code>
+ * </pre>
+ *<p>
+ *
+ * Sample writer code:
+ * <pre>
+ * <code>
+ * // start transaction
+ * // write change
+ * // commit transaction
+ *
+ * // Now wait on the fence (with the same fenceId as the readers) to ensure that all in-progress readers are
+ * aware of this change
+ * try {
+ * FenceWait fenceWait = VisibilityFence.prepareWait(fenceId, txClient);
+ * fenceWait.await(50000, 50, TimeUnit.MILLISECONDS);
+ * } catch (TimeoutException e) {
+ * // await timed out, the change may not be visible to all in-progress readers.
+ * // Application has two options at this point:
+ * // 1. Revert the write. Re-try the write and fence wait again.
+ * // 2. Retry only the wait with the same fenceWait object (retry logic is not shown here).
+ * }
+ * </code>
+ * </pre>
+ *
+ * fenceId in the above samples refers to any id that both the readers and writer know for a given
+ * piece of data. Both readers and writer will have to use the same fenceId to synchronize on a given data.
+ * Typically fenceId uniquely identifies the data in question.
+ * For example, if the data is a table row, the fenceId can be composed of table name and row key.
+ * If the data is a table cell, the fenceId can be composed of table name, row key, and column key.
+ *<p>
+ *
+ * Note that in this implementation, any reader that starts a transaction after the write is committed, and
+ * while this read transaction is in-progress, if a writer successfully starts and completes an await on the fence then
+ * this reader will get a conflict while committing the fence even though this reader has seen the latest changes.
+ * This is because today there is no way to determine the commit time of a transaction.
+ */
+public final class VisibilityFence {
+ private VisibilityFence() {
+ // Cannot instantiate this class, all functionality is through static methods.
+ }
+
+ /**
+ * Used by a reader to get a fence that can be added to its transaction context.
+ *
+ * @param fenceId uniquely identifies the data that this fence is used to synchronize.
+ * If the data is a table cell then this id can be composed of the table name, row key
+ * and column key for the data.
+ * @return {@link TransactionAware} to be added to reader's transaction context.
+ */
+ public static TransactionAware create(byte[] fenceId) {
+ return new ReadFence(fenceId);
+ }
+
+ /**
+ * Used by a writer to wait on a fence so that changes are visible to all readers with in-progress transactions.
+ *
+ * @param fenceId uniquely identifies the data that this fence is used to synchronize.
+ * If the data is a table cell then this id can be composed of the table name, row key
+ * and column key for the data.
+ * @return {@link FenceWait} object
+ */
+ public static FenceWait prepareWait(byte[] fenceId, TransactionSystemClient txClient)
+ throws TransactionFailureException, InterruptedException, TimeoutException {
+ return new DefaultFenceWait(new TransactionContext(txClient, new WriteFence(fenceId)));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/visibility/WriteFence.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/visibility/WriteFence.java b/tephra-core/src/main/java/org/apache/tephra/visibility/WriteFence.java
new file mode 100644
index 0000000..fe59efe
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/visibility/WriteFence.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.visibility;
+
+import com.google.common.primitives.Bytes;
+import com.google.common.primitives.Longs;
+import com.google.common.primitives.UnsignedBytes;
+import org.apache.tephra.Transaction;
+import org.apache.tephra.TransactionAware;
+
+import java.util.Collection;
+import java.util.TreeSet;
+
+/**
+ * Implementation used by {@link FenceWait} to wait for a {@link VisibilityFence}.
+ */
+class WriteFence implements TransactionAware {
+ private final byte[] fenceId;
+ private Transaction tx;
+ private Collection<byte[]> inProgressChanges;
+
+ public WriteFence(byte[] fenceId) {
+ this.fenceId = fenceId;
+ }
+
+ @Override
+ public void startTx(Transaction tx) {
+ this.tx = tx;
+ if (inProgressChanges == null) {
+ inProgressChanges = new TreeSet<>(UnsignedBytes.lexicographicalComparator());
+ for (long inProgressTx : tx.getInProgress()) {
+ inProgressChanges.add(Bytes.concat(fenceId, Longs.toByteArray(inProgressTx)));
+ }
+ }
+ }
+
+ @Override
+ public void updateTx(Transaction tx) {
+ // Fences only need original transaction
+ }
+
+ @Override
+ public Collection<byte[]> getTxChanges() {
+ if (inProgressChanges == null || tx == null) {
+ throw new IllegalStateException("Transaction has not started yet");
+ }
+ return inProgressChanges;
+ }
+
+ @Override
+ public boolean commitTx() throws Exception {
+ // Nothing to persist
+ return true;
+ }
+
+ @Override
+ public void postTxCommit() {
+ tx = null;
+ }
+
+ @Override
+ public boolean rollbackTx() throws Exception {
+ // Nothing to rollback
+ return true;
+ }
+
+ @Override
+ public String getTransactionAwareName() {
+ return getClass().getSimpleName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicACLData.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicACLData.java b/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicACLData.java
new file mode 100644
index 0000000..8d95a42
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicACLData.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.zookeeper;
+
+import org.apache.twill.zookeeper.ACLData;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+
+/**
+ * A straightforward implementation of {@link ACLData}.
+ */
+final class BasicACLData implements ACLData {
+
+ private final List<ACL> acl;
+ private final Stat stat;
+
+ BasicACLData(List<ACL> acl, Stat stat) {
+ this.acl = acl;
+ this.stat = stat;
+ }
+
+ @Override
+ public List<ACL> getACL() {
+ return acl;
+ }
+
+ @Override
+ public Stat getStat() {
+ return stat;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeChildren.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeChildren.java b/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeChildren.java
new file mode 100644
index 0000000..80edddb
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeChildren.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.zookeeper;
+
+import com.google.common.base.Objects;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+
+/**
+ * Implementation of the {@link NodeChildren}.
+ */
+final class BasicNodeChildren implements NodeChildren {
+
+ private final Stat stat;
+ private final List<String> children;
+
+ BasicNodeChildren(List<String> children, Stat stat) {
+ this.stat = stat;
+ this.children = children;
+ }
+
+ @Override
+ public Stat getStat() {
+ return stat;
+ }
+
+ @Override
+ public List<String> getChildren() {
+ return children;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || !(o instanceof NodeChildren)) {
+ return false;
+ }
+
+ NodeChildren that = (NodeChildren) o;
+ return stat.equals(that.getStat()) && children.equals(that.getChildren());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(children, stat);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeData.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeData.java b/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeData.java
new file mode 100644
index 0000000..5df3475
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/zookeeper/BasicNodeData.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.zookeeper;
+
+import com.google.common.base.Objects;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.Arrays;
+
+/**
+ * A straightforward implementation for {@link NodeData}.
+ */
+final class BasicNodeData implements NodeData {
+
+ private final byte[] data;
+ private final Stat stat;
+
+ BasicNodeData(byte[] data, Stat stat) {
+ this.data = data;
+ this.stat = stat;
+ }
+
+ @Override
+ public Stat getStat() {
+ return stat;
+ }
+
+ @Override
+ public byte[] getData() {
+ return data;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || !(o instanceof NodeData)) {
+ return false;
+ }
+
+ BasicNodeData that = (BasicNodeData) o;
+
+ return stat.equals(that.getStat()) && Arrays.equals(data, that.getData());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(data, stat);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/zookeeper/TephraZKClientService.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/zookeeper/TephraZKClientService.java b/tephra-core/src/main/java/org/apache/tephra/zookeeper/TephraZKClientService.java
new file mode 100644
index 0000000..2bc7a6a
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/zookeeper/TephraZKClientService.java
@@ -0,0 +1,626 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.zookeeper;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Multimap;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.common.Threads;
+import org.apache.twill.internal.zookeeper.SettableOperationFuture;
+import org.apache.twill.zookeeper.ACLData;
+import org.apache.twill.zookeeper.NodeChildren;
+import org.apache.twill.zookeeper.NodeData;
+import org.apache.twill.zookeeper.OperationFuture;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+
+/**
+ * The implementation of {@link ZKClientService}.
+ */
+public class TephraZKClientService extends AbstractService implements ZKClientService, Watcher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TephraZKClientService.class);
+
+ private final String zkStr;
+ private final int sessionTimeout;
+ private final List<Watcher> connectionWatchers;
+ private final Multimap<String, byte[]> authInfos;
+ private final AtomicReference<ZooKeeper> zooKeeper;
+ private final Runnable stopTask;
+ private ExecutorService eventExecutor;
+
+ /**
+ * Create a new instance.
+ * @param zkStr zookeper connection string
+ * @param sessionTimeout timeout in milliseconds
+ * @param connectionWatcher watcher to set
+ * @param authInfos authorization bytes
+ */
+ public TephraZKClientService(String zkStr, int sessionTimeout,
+ Watcher connectionWatcher, Multimap<String, byte[]> authInfos) {
+ this.zkStr = zkStr;
+ this.sessionTimeout = sessionTimeout;
+ this.connectionWatchers = new CopyOnWriteArrayList<>();
+ this.authInfos = copyAuthInfo(authInfos);
+ addConnectionWatcher(connectionWatcher);
+
+ this.zooKeeper = new AtomicReference<>();
+ this.stopTask = createStopTask();
+ }
+
+ @Override
+ public Long getSessionId() {
+ ZooKeeper zk = zooKeeper.get();
+ return zk == null ? null : zk.getSessionId();
+ }
+
+ @Override
+ public String getConnectString() {
+ return zkStr;
+ }
+
+ @Override
+ public Cancellable addConnectionWatcher(final Watcher watcher) {
+ if (watcher == null) {
+ return new Cancellable() {
+ @Override
+ public void cancel() {
+ // No-op
+ }
+ };
+ }
+
+ // Invocation of connection watchers are already done inside the event thread,
+ // hence no need to wrap the watcher again.
+ connectionWatchers.add(watcher);
+ return new Cancellable() {
+ @Override
+ public void cancel() {
+ connectionWatchers.remove(watcher);
+ }
+ };
+ }
+
+ @Override
+ public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode) {
+ return create(path, data, createMode, true);
+ }
+
+ @Override
+ public OperationFuture<String> create(String path, @Nullable byte[] data,
+ CreateMode createMode, boolean createParent) {
+ return create(path, data, createMode, createParent, ZooDefs.Ids.OPEN_ACL_UNSAFE);
+ }
+
+ @Override
+ public OperationFuture<String> create(String path, @Nullable byte[] data,
+ CreateMode createMode, Iterable<ACL> acl) {
+ return create(path, data, createMode, true, acl);
+ }
+
+ @Override
+ public OperationFuture<Stat> exists(String path) {
+ return exists(path, null);
+ }
+
+ @Override
+ public OperationFuture<NodeChildren> getChildren(String path) {
+ return getChildren(path, null);
+ }
+
+ @Override
+ public OperationFuture<NodeData> getData(String path) {
+ return getData(path, null);
+ }
+
+ @Override
+ public OperationFuture<Stat> setData(String path, byte[] data) {
+ return setData(path, data, -1);
+ }
+
+ @Override
+ public OperationFuture<String> delete(String path) {
+ return delete(path, -1);
+ }
+
+ @Override
+ public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl) {
+ return setACL(path, acl, -1);
+ }
+
+ @Override
+ public OperationFuture<String> create(String path, @Nullable byte[] data, CreateMode createMode,
+ boolean createParent, Iterable<ACL> acl) {
+ return doCreate(path, data, createMode, createParent, ImmutableList.copyOf(acl), false);
+ }
+
+ private OperationFuture<String> doCreate(final String path,
+ @Nullable final byte[] data,
+ final CreateMode createMode,
+ final boolean createParent,
+ final List<ACL> acl,
+ final boolean ignoreNodeExists) {
+ final SettableOperationFuture<String> createFuture = SettableOperationFuture.create(path, eventExecutor);
+ getZooKeeper().create(path, data, acl, createMode, Callbacks.STRING, createFuture);
+ if (!createParent) {
+ return createFuture;
+ }
+
+ // If create parent is request, return a different future
+ final SettableOperationFuture<String> result = SettableOperationFuture.create(path, eventExecutor);
+ // Watch for changes in the original future
+ Futures.addCallback(createFuture, new FutureCallback<String>() {
+ @Override
+ public void onSuccess(String path) {
+ // Propagate if creation was successful
+ result.set(path);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // See if the failure can be handled
+ if (updateFailureResult(t, result, path, ignoreNodeExists)) {
+ return;
+ }
+ // Create the parent node
+ String parentPath = getParent(path);
+ if (parentPath.isEmpty()) {
+ result.setException(t);
+ return;
+ }
+ // Watch for parent creation complete. Parent is created with the unsafe ACL.
+ Futures.addCallback(doCreate(parentPath, null, CreateMode.PERSISTENT,
+ true, ZooDefs.Ids.OPEN_ACL_UNSAFE, true), new FutureCallback<String>() {
+ @Override
+ public void onSuccess(String parentPath) {
+ // Create the requested path again
+ Futures.addCallback(
+ doCreate(path, data, createMode, false, acl, ignoreNodeExists), new FutureCallback<String>() {
+ @Override
+ public void onSuccess(String pathResult) {
+ result.set(pathResult);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ // handle the failure
+ updateFailureResult(t, result, path, ignoreNodeExists);
+ }
+ });
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ result.setException(t);
+ }
+ });
+ }
+
+ /**
+ * Updates the result future based on the given {@link Throwable}.
+ * @param t Cause of the failure
+ * @param result Future to be updated
+ * @param path Request path for the operation
+ * @return {@code true} if it is a failure, {@code false} otherwise.
+ */
+ private boolean updateFailureResult(Throwable t, SettableOperationFuture<String> result,
+ String path, boolean ignoreNodeExists) {
+ // Propagate if there is error
+ if (!(t instanceof KeeperException)) {
+ result.setException(t);
+ return true;
+ }
+ KeeperException.Code code = ((KeeperException) t).code();
+ // Node already exists, simply return success if it allows for ignoring node exists (for parent node creation).
+ if (ignoreNodeExists && code == KeeperException.Code.NODEEXISTS) {
+ // The requested path could be used because it only applies to non-sequential node
+ result.set(path);
+ return false;
+ }
+ if (code != KeeperException.Code.NONODE) {
+ result.setException(t);
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Gets the parent of the given path.
+ * @param path Path for computing its parent
+ * @return Parent of the given path, or empty string if the given path is the root path already.
+ */
+ private String getParent(String path) {
+ String parentPath = path.substring(0, path.lastIndexOf('/'));
+ return (parentPath.isEmpty() && !"/".equals(path)) ? "/" : parentPath;
+ }
+ });
+
+ return result;
+ }
+
+ @Override
+ public OperationFuture<Stat> exists(String path, Watcher watcher) {
+ SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, eventExecutor);
+ getZooKeeper().exists(path, wrapWatcher(watcher), Callbacks.STAT_NONODE, result);
+ return result;
+ }
+
+ @Override
+ public OperationFuture<NodeChildren> getChildren(String path, Watcher watcher) {
+ SettableOperationFuture<NodeChildren> result = SettableOperationFuture.create(path, eventExecutor);
+ getZooKeeper().getChildren(path, wrapWatcher(watcher), Callbacks.CHILDREN, result);
+ return result;
+ }
+
+ @Override
+ public OperationFuture<NodeData> getData(String path, Watcher watcher) {
+ SettableOperationFuture<NodeData> result = SettableOperationFuture.create(path, eventExecutor);
+ getZooKeeper().getData(path, wrapWatcher(watcher), Callbacks.DATA, result);
+
+ return result;
+ }
+
+ @Override
+ public OperationFuture<Stat> setData(String dataPath, byte[] data, int version) {
+ SettableOperationFuture<Stat> result = SettableOperationFuture.create(dataPath, eventExecutor);
+ getZooKeeper().setData(dataPath, data, version, Callbacks.STAT, result);
+ return result;
+ }
+
+ @Override
+ public OperationFuture<String> delete(String deletePath, int version) {
+ SettableOperationFuture<String> result = SettableOperationFuture.create(deletePath, eventExecutor);
+ getZooKeeper().delete(deletePath, version, Callbacks.VOID, result);
+ return result;
+ }
+
+ @Override
+ public OperationFuture<ACLData> getACL(String path) {
+ SettableOperationFuture<ACLData> result = SettableOperationFuture.create(path, eventExecutor);
+ getZooKeeper().getACL(path, new Stat(), Callbacks.ACL, result);
+ return result;
+ }
+
+ @Override
+ public OperationFuture<Stat> setACL(String path, Iterable<ACL> acl, int version) {
+ SettableOperationFuture<Stat> result = SettableOperationFuture.create(path, eventExecutor);
+ getZooKeeper().setACL(path, ImmutableList.copyOf(acl), version, Callbacks.STAT, result);
+ return result;
+ }
+
+ @Override
+ public Supplier<ZooKeeper> getZooKeeperSupplier() {
+ return new Supplier<ZooKeeper>() {
+ @Override
+ public ZooKeeper get() {
+ return getZooKeeper();
+ }
+ };
+ }
+
+ @Override
+ protected void doStart() {
+ // A single thread executor for all events
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ Threads.createDaemonThreadFactory("zk-client-EventThread"));
+ // Just discard the execution if the executor is closed
+ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
+ eventExecutor = executor;
+
+ try {
+ zooKeeper.set(createZooKeeper());
+ } catch (IOException e) {
+ notifyFailed(e);
+ }
+ }
+
+ @Override
+ protected void doStop() {
+ // Submit a task to the executor to make sure all pending events in the executor are fired before
+ // transiting this Service into STOPPED state
+ eventExecutor.submit(stopTask);
+ eventExecutor.shutdown();
+ }
+
+ /**
+ * @return Current {@link ZooKeeper} client.
+ */
+ private ZooKeeper getZooKeeper() {
+ ZooKeeper zk = zooKeeper.get();
+ Preconditions.checkArgument(zk != null, "Not connected to zooKeeper.");
+ return zk;
+ }
+
+ /**
+ * Wraps the given watcher to be called from the event executor.
+ * @param watcher Watcher to be wrapped
+ * @return The wrapped Watcher
+ */
+ private Watcher wrapWatcher(final Watcher watcher) {
+ if (watcher == null) {
+ return null;
+ }
+ return new Watcher() {
+ @Override
+ public void process(final WatchedEvent event) {
+ if (eventExecutor.isShutdown()) {
+ LOG.debug("Already shutdown. Discarding event: {}", event);
+ return;
+ }
+ eventExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ watcher.process(event);
+ } catch (Throwable t) {
+ LOG.error("Watcher throws exception.", t);
+ }
+ }
+ });
+ }
+ };
+ }
+
+ /**
+ * Creates a deep copy of the given authInfos multimap.
+ */
+ private Multimap<String, byte[]> copyAuthInfo(Multimap<String, byte[]> authInfos) {
+ Multimap<String, byte[]> result = ArrayListMultimap.create();
+
+ for (Map.Entry<String, byte[]> entry : authInfos.entries()) {
+ byte[] info = entry.getValue();
+ result.put(entry.getKey(), info == null ? null : Arrays.copyOf(info, info.length));
+ }
+
+ return result;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ State state = state();
+ if (state == State.TERMINATED || state == State.FAILED) {
+ return;
+ }
+
+ try {
+ if (event.getState() == Event.KeeperState.SyncConnected && state == State.STARTING) {
+ LOG.debug("Connected to ZooKeeper: {}", zkStr);
+ notifyStarted();
+ return;
+ }
+ if (event.getState() == Event.KeeperState.Expired) {
+ LOG.info("ZooKeeper session expired: {}", zkStr);
+
+ // When connection expired, simply reconnect again
+ if (state != State.RUNNING) {
+ return;
+ }
+ eventExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ // Only reconnect if the current state is running
+ if (state() != State.RUNNING) {
+ return;
+ }
+ try {
+ LOG.info("Reconnect to ZooKeeper due to expiration: {}", zkStr);
+ closeZooKeeper(zooKeeper.getAndSet(createZooKeeper()));
+ } catch (IOException e) {
+ notifyFailed(e);
+ }
+ }
+ });
+ }
+ } finally {
+ if (event.getType() == Event.EventType.None) {
+ for (Watcher connectionWatcher : connectionWatchers) {
+ connectionWatcher.process(event);
+ }
+ }
+ }
+ }
+
+ /**
+ * Creates a new ZooKeeper connection.
+ */
+ private ZooKeeper createZooKeeper() throws IOException {
+ ZooKeeper zk = new ZooKeeper(zkStr, sessionTimeout, wrapWatcher(this));
+ for (Map.Entry<String, byte[]> authInfo : authInfos.entries()) {
+ zk.addAuthInfo(authInfo.getKey(), authInfo.getValue());
+ }
+ return zk;
+ }
+
+ /**
+ * Closes the given {@link ZooKeeper} if it is not null. If there is InterruptedException,
+ * it will get logged.
+ */
+ private void closeZooKeeper(@Nullable ZooKeeper zk) {
+ try {
+ if (zk != null) {
+ zk.close();
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted when closing ZooKeeper", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Creates a {@link Runnable} task that will get executed in the event executor for transiting this
+ * Service into STOPPED state.
+ */
+ private Runnable createStopTask() {
+ return new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // Close the ZK connection in this task will make sure if there is ZK connection created
+ // after doStop() was called but before this task has been executed is also closed.
+ // It is possible to happen when the following sequence happens:
+ //
+ // 1. session expired, hence the expired event is triggered
+ // 2. The reconnect task executed. With Service.state() == RUNNING, it creates a new ZK client
+ // 3. Service.stop() gets called, Service.state() changed to STOPPING
+ // 4. The new ZK client created from the reconnect thread update the zooKeeper with the new one
+ closeZooKeeper(zooKeeper.getAndSet(null));
+ notifyStopped();
+ } catch (Exception e) {
+ notifyFailed(e);
+ }
+ }
+ };
+ }
+
+ /**
+ * Collection of generic callbacks that simply reflect results into OperationFuture.
+ */
+ private static final class Callbacks {
+ static final AsyncCallback.StringCallback STRING = new AsyncCallback.StringCallback() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processResult(int rc, String path, Object ctx, String name) {
+ SettableOperationFuture<String> result = (SettableOperationFuture<String>) ctx;
+ KeeperException.Code code = KeeperException.Code.get(rc);
+ if (code == KeeperException.Code.OK) {
+ result.set((name == null || name.isEmpty()) ? path : name);
+ return;
+ }
+ result.setException(KeeperException.create(code, result.getRequestPath()));
+ }
+ };
+
+ static final AsyncCallback.StatCallback STAT = new AsyncCallback.StatCallback() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processResult(int rc, String path, Object ctx, Stat stat) {
+ SettableOperationFuture<Stat> result = (SettableOperationFuture<Stat>) ctx;
+ KeeperException.Code code = KeeperException.Code.get(rc);
+ if (code == KeeperException.Code.OK) {
+ result.set(stat);
+ return;
+ }
+ result.setException(KeeperException.create(code, result.getRequestPath()));
+ }
+ };
+
+ /**
+ * A stat callback that treats NONODE as success.
+ */
+ static final AsyncCallback.StatCallback STAT_NONODE = new AsyncCallback.StatCallback() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processResult(int rc, String path, Object ctx, Stat stat) {
+ SettableOperationFuture<Stat> result = (SettableOperationFuture<Stat>) ctx;
+ KeeperException.Code code = KeeperException.Code.get(rc);
+ if (code == KeeperException.Code.OK || code == KeeperException.Code.NONODE) {
+ result.set(stat);
+ return;
+ }
+ result.setException(KeeperException.create(code, result.getRequestPath()));
+ }
+ };
+
+ static final AsyncCallback.Children2Callback CHILDREN = new AsyncCallback.Children2Callback() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
+ SettableOperationFuture<NodeChildren> result = (SettableOperationFuture<NodeChildren>) ctx;
+ KeeperException.Code code = KeeperException.Code.get(rc);
+ if (code == KeeperException.Code.OK) {
+ result.set(new BasicNodeChildren(children, stat));
+ return;
+ }
+ result.setException(KeeperException.create(code, result.getRequestPath()));
+ }
+ };
+
+ static final AsyncCallback.DataCallback DATA = new AsyncCallback.DataCallback() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
+ SettableOperationFuture<NodeData> result = (SettableOperationFuture<NodeData>) ctx;
+ KeeperException.Code code = KeeperException.Code.get(rc);
+ if (code == KeeperException.Code.OK) {
+ result.set(new BasicNodeData(data, stat));
+ return;
+ }
+ result.setException(KeeperException.create(code, result.getRequestPath()));
+ }
+ };
+
+ static final AsyncCallback.VoidCallback VOID = new AsyncCallback.VoidCallback() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processResult(int rc, String path, Object ctx) {
+ SettableOperationFuture<String> result = (SettableOperationFuture<String>) ctx;
+ KeeperException.Code code = KeeperException.Code.get(rc);
+ if (code == KeeperException.Code.OK) {
+ result.set(result.getRequestPath());
+ return;
+ }
+ // Otherwise, it is an error
+ result.setException(KeeperException.create(code, result.getRequestPath()));
+ }
+ };
+
+ static final AsyncCallback.ACLCallback ACL = new AsyncCallback.ACLCallback() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) {
+ SettableOperationFuture<ACLData> result = (SettableOperationFuture<ACLData>) ctx;
+ KeeperException.Code code = KeeperException.Code.get(rc);
+ if (code == KeeperException.Code.OK) {
+ result.set(new BasicACLData(acl, stat));
+ return;
+ }
+ result.setException(KeeperException.create(code, result.getRequestPath()));
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/thrift/transaction.thrift
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/thrift/transaction.thrift b/tephra-core/src/main/thrift/transaction.thrift
index 39e6cec..0e3d712 100644
--- a/tephra-core/src/main/thrift/transaction.thrift
+++ b/tephra-core/src/main/thrift/transaction.thrift
@@ -16,7 +16,7 @@
# limitations under the License.
#
-namespace java co.cask.tephra.distributed.thrift
+namespace java org.apache.tephra.distributed.thrift
enum TTransactionType {
SHORT = 1,
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/ThriftTransactionSystemTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/ThriftTransactionSystemTest.java b/tephra-core/src/test/java/co/cask/tephra/ThriftTransactionSystemTest.java
deleted file mode 100644
index cf01d25..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/ThriftTransactionSystemTest.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import co.cask.tephra.distributed.TransactionService;
-import co.cask.tephra.persist.InMemoryTransactionStateStorage;
-import co.cask.tephra.persist.TransactionStateStorage;
-import co.cask.tephra.runtime.ConfigModule;
-import co.cask.tephra.runtime.DiscoveryModules;
-import co.cask.tephra.runtime.TransactionClientModule;
-import co.cask.tephra.runtime.TransactionModules;
-import co.cask.tephra.runtime.ZKModule;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Scopes;
-import com.google.inject.util.Modules;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ThriftTransactionSystemTest extends TransactionSystemTest {
- private static final Logger LOG = LoggerFactory.getLogger(ThriftTransactionSystemTest.class);
-
- private static InMemoryZKServer zkServer;
- private static ZKClientService zkClientService;
- private static TransactionService txService;
- private static TransactionStateStorage storage;
- private static TransactionSystemClient txClient;
-
- @ClassRule
- public static TemporaryFolder tmpFolder = new TemporaryFolder();
-
- @BeforeClass
- public static void start() throws Exception {
- zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
- zkServer.startAndWait();
-
- Configuration conf = new Configuration();
- conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
- conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
- conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
- conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
-
- Injector injector = Guice.createInjector(
- new ConfigModule(conf),
- new ZKModule(),
- new DiscoveryModules().getDistributedModules(),
- Modules.override(new TransactionModules().getDistributedModules())
- .with(new AbstractModule() {
- @Override
- protected void configure() {
- bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
- }
- }),
- new TransactionClientModule()
- );
-
- zkClientService = injector.getInstance(ZKClientService.class);
- zkClientService.startAndWait();
-
- // start a tx server
- txService = injector.getInstance(TransactionService.class);
- storage = injector.getInstance(TransactionStateStorage.class);
- txClient = injector.getInstance(TransactionSystemClient.class);
- try {
- LOG.info("Starting transaction service");
- txService.startAndWait();
- } catch (Exception e) {
- LOG.error("Failed to start service: ", e);
- }
- }
-
- @Before
- public void reset() throws Exception {
- getClient().resetState();
- }
-
- @AfterClass
- public static void stop() throws Exception {
- txService.stopAndWait();
- storage.stopAndWait();
- zkClientService.stopAndWait();
- zkServer.stopAndWait();
- }
-
- @Override
- protected TransactionSystemClient getClient() throws Exception {
- return txClient;
- }
-
- @Override
- protected TransactionStateStorage getStateStorage() throws Exception {
- return storage;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/test/java/co/cask/tephra/TransactionAdminTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/co/cask/tephra/TransactionAdminTest.java b/tephra-core/src/test/java/co/cask/tephra/TransactionAdminTest.java
deleted file mode 100644
index de02f80..0000000
--- a/tephra-core/src/test/java/co/cask/tephra/TransactionAdminTest.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra;
-
-import co.cask.tephra.distributed.TransactionService;
-import co.cask.tephra.persist.InMemoryTransactionStateStorage;
-import co.cask.tephra.persist.TransactionStateStorage;
-import co.cask.tephra.runtime.ConfigModule;
-import co.cask.tephra.runtime.DiscoveryModules;
-import co.cask.tephra.runtime.TransactionClientModule;
-import co.cask.tephra.runtime.TransactionModules;
-import co.cask.tephra.runtime.ZKModule;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Scopes;
-import com.google.inject.util.Modules;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.twill.internal.zookeeper.InMemoryZKServer;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.util.concurrent.TimeUnit;
-
-public class TransactionAdminTest {
- private static final Logger LOG = LoggerFactory.getLogger(TransactionAdminTest.class);
-
- private static Configuration conf;
- private static InMemoryZKServer zkServer;
- private static ZKClientService zkClientService;
- private static TransactionService txService;
- private static TransactionSystemClient txClient;
-
- @ClassRule
- public static TemporaryFolder tmpFolder = new TemporaryFolder();
-
- @BeforeClass
- public static void start() throws Exception {
- zkServer = InMemoryZKServer.builder().setDataDir(tmpFolder.newFolder()).build();
- zkServer.startAndWait();
-
- conf = new Configuration();
- conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
- conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkServer.getConnectionStr());
- conf.set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, "n-times");
- conf.setInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, 1);
-
- Injector injector = Guice.createInjector(
- new ConfigModule(conf),
- new ZKModule(),
- new DiscoveryModules().getDistributedModules(),
- Modules.override(new TransactionModules().getDistributedModules())
- .with(new AbstractModule() {
- @Override
- protected void configure() {
- bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
- }
- }),
- new TransactionClientModule()
- );
-
- zkClientService = injector.getInstance(ZKClientService.class);
- zkClientService.startAndWait();
-
- // start a tx server
- txService = injector.getInstance(TransactionService.class);
- txClient = injector.getInstance(TransactionSystemClient.class);
- try {
- LOG.info("Starting transaction service");
- txService.startAndWait();
- } catch (Exception e) {
- LOG.error("Failed to start service: ", e);
- }
- }
-
- @Before
- public void reset() throws Exception {
- txClient.resetState();
- }
-
- @AfterClass
- public static void stop() throws Exception {
- txService.stopAndWait();
- zkClientService.stopAndWait();
- zkServer.stopAndWait();
- }
-
- @Test
- public void testPrintUsage() throws Exception {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteArrayOutputStream err = new ByteArrayOutputStream();
- TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(out), new PrintStream(err));
- int status = txAdmin.doMain(new String[0], conf);
- Assert.assertEquals(1, status);
- //noinspection ConstantConditions
- Assert.assertTrue(err.toString("UTF-8").startsWith("Usage:"));
- Assert.assertEquals(0, out.toByteArray().length);
- }
-
- @Test
- public void testTruncateInvalidTx() throws Exception {
- Transaction tx1 = txClient.startLong();
- Transaction tx2 = txClient.startShort();
- txClient.invalidate(tx1.getTransactionId());
- txClient.invalidate(tx2.getTransactionId());
- Assert.assertEquals(2, txClient.getInvalidSize());
-
- TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(System.out), new PrintStream(System.err));
- int status = txAdmin.doMain(new String[]{"--truncate-invalid-tx", String.valueOf(tx2.getTransactionId())}, conf);
- Assert.assertEquals(0, status);
- Assert.assertEquals(1, txClient.getInvalidSize());
- }
-
- @Test
- public void testTruncateInvalidTxBefore() throws Exception {
- Transaction tx1 = txClient.startLong();
- TimeUnit.MILLISECONDS.sleep(1);
- long beforeTx2 = System.currentTimeMillis();
- Transaction tx2 = txClient.startLong();
-
- // Try before invalidation
- Assert.assertEquals(0, txClient.getInvalidSize());
- TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(System.out), new PrintStream(System.err));
- int status = txAdmin.doMain(new String[]{"--truncate-invalid-tx-before", String.valueOf(beforeTx2)}, conf);
- // Assert command failed due to in-progress transactions
- Assert.assertEquals(1, status);
- // Assert no change to invalid size
- Assert.assertEquals(0, txClient.getInvalidSize());
-
- txClient.invalidate(tx1.getTransactionId());
- txClient.invalidate(tx2.getTransactionId());
- Assert.assertEquals(2, txClient.getInvalidSize());
-
- status = txAdmin.doMain(new String[]{"--truncate-invalid-tx-before", String.valueOf(beforeTx2)}, conf);
- Assert.assertEquals(0, status);
- Assert.assertEquals(1, txClient.getInvalidSize());
- }
-
- @Test
- public void testGetInvalidTxSize() throws Exception {
- Transaction tx1 = txClient.startShort();
- txClient.startLong();
- txClient.invalidate(tx1.getTransactionId());
-
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- ByteArrayOutputStream err = new ByteArrayOutputStream();
- TransactionAdmin txAdmin = new TransactionAdmin(new PrintStream(out), new PrintStream(err));
- int status = txAdmin.doMain(new String[]{"--get-invalid-tx-size"}, conf);
- Assert.assertEquals(0, status);
- //noinspection ConstantConditions
- Assert.assertTrue(out.toString("UTF-8").contains("Invalid list size: 1\n"));
- }
-}