You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/06 23:02:54 UTC
[48/51] [partial] incubator-tephra git commit: Rename package to
org.apache.tephra
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCache.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCache.java b/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCache.java
deleted file mode 100644
index 2745c76..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCache.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.coprocessor;
-
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.persist.HDFSTransactionStateStorage;
-import co.cask.tephra.persist.TransactionStateStorage;
-import co.cask.tephra.persist.TransactionVisibilityState;
-import co.cask.tephra.snapshot.SnapshotCodecProvider;
-import co.cask.tephra.util.ConfigurationFactory;
-import com.google.common.util.concurrent.AbstractIdleService;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Periodically refreshes transaction state from the latest stored snapshot. This is implemented as a singleton
- * to allow a single cache to be shared by all regions on a regionserver.
- */
-public class TransactionStateCache extends AbstractIdleService implements Configurable {
- private static final Log LOG = LogFactory.getLog(TransactionStateCache.class);
-
- // how frequently we should wake to check for changes (in seconds)
- private static final long CHECK_FREQUENCY = 15;
-
- private Configuration hConf;
-
- private TransactionStateStorage storage;
- private volatile TransactionVisibilityState latestState;
-
- private Thread refreshService;
- private long lastRefresh;
- // snapshot refresh frequency in milliseconds
- private long snapshotRefreshFrequency;
- private boolean initialized;
-
- public TransactionStateCache() {
- }
-
- @Override
- public Configuration getConf() {
- return hConf;
- }
-
- @Override
- public void setConf(Configuration conf) {
- this.hConf = conf;
- }
-
- @Override
- protected void startUp() throws Exception {
- refreshState();
- startRefreshService();
- }
-
- @Override
- protected void shutDown() throws Exception {
- this.refreshService.interrupt();
- this.storage.stop();
- }
-
- /**
- * Try to initialize the Configuration and TransactionStateStorage instances. Obtaining the Configuration may
- * fail until ReactorServiceMain has been started.
- */
- private void tryInit() {
- try {
- Configuration conf = getSnapshotConfiguration();
- if (conf != null) {
- // Since this is only used for background loading of transaction snapshots, we use the no-op metrics collector,
- // as there are no relevant metrics to report
- this.storage = new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf),
- new TxMetricsCollector());
- this.storage.startAndWait();
- this.snapshotRefreshFrequency = conf.getLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL,
- TxConstants.Manager.DEFAULT_TX_SNAPSHOT_INTERVAL) * 1000;
- this.initialized = true;
- } else {
- LOG.info("Could not load configuration");
- }
- } catch (Exception e) {
- LOG.info("Failed to initialize TransactionStateCache due to: " + e.getMessage());
- }
- }
-
- protected Configuration getSnapshotConfiguration() throws IOException {
- Configuration conf = new ConfigurationFactory().get(hConf);
- conf.unset(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES);
- return conf;
- }
-
- private void reset() {
- this.storage.stop();
- this.lastRefresh = 0;
- this.initialized = false;
- }
-
- private void startRefreshService() {
- this.refreshService = new Thread("tx-state-refresh") {
- @Override
- public void run() {
- while (!isInterrupted()) {
- if (latestState == null || System.currentTimeMillis() > (lastRefresh + snapshotRefreshFrequency)) {
- try {
- refreshState();
- } catch (IOException ioe) {
- LOG.info("Error refreshing transaction state cache: " + ioe.getMessage());
- }
- }
- try {
- TimeUnit.SECONDS.sleep(CHECK_FREQUENCY);
- } catch (InterruptedException ie) {
- // reset status
- interrupt();
- break;
- }
- }
- LOG.info("Exiting thread " + getName());
- }
- };
- this.refreshService.setDaemon(true);
- this.refreshService.start();
- }
-
- private void refreshState() throws IOException {
- if (!initialized) {
- tryInit();
- }
-
- // only continue if initialization was successful
- if (initialized) {
- long now = System.currentTimeMillis();
- TransactionVisibilityState currentState = storage.getLatestTransactionVisibilityState();
- if (currentState != null) {
- if (currentState.getTimestamp() < (now - 2 * snapshotRefreshFrequency)) {
- LOG.info("Current snapshot is old, will force a refresh on next run.");
- reset();
- } else {
- latestState = currentState;
- LOG.info("Transaction state reloaded with snapshot from " + latestState.getTimestamp());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Latest transaction snapshot: " + latestState.toString());
- }
- lastRefresh = now;
- }
- } else {
- LOG.info("No transaction state found.");
- }
- }
- }
-
- public TransactionVisibilityState getLatestState() {
- return latestState;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCacheSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCacheSupplier.java b/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCacheSupplier.java
deleted file mode 100644
index 9f4a778..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCacheSupplier.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.coprocessor;
-
-import com.google.common.base.Supplier;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Supplies instances of {@link TransactionStateCache} implementations.
- */
-public class TransactionStateCacheSupplier implements Supplier<TransactionStateCache> {
- protected static volatile TransactionStateCache instance;
- protected static Object lock = new Object();
-
- protected final Configuration conf;
-
- public TransactionStateCacheSupplier(Configuration conf) {
- this.conf = conf;
- }
-
- /**
- * Returns a singleton instance of the transaction state cache, performing lazy initialization if necessary.
- * @return A shared instance of the transaction state cache.
- */
- @Override
- public TransactionStateCache get() {
- if (instance == null) {
- synchronized (lock) {
- if (instance == null) {
- instance = new TransactionStateCache();
- instance.setConf(conf);
- instance.start();
- }
- }
- }
- return instance;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/coprocessor/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/coprocessor/package-info.java b/tephra-core/src/main/java/co/cask/tephra/coprocessor/package-info.java
deleted file mode 100644
index 5fa566c..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/coprocessor/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains HBase coprocessor implementations for the transaction system.
- */
-package co.cask.tephra.coprocessor;
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/AbstractClientProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/AbstractClientProvider.java b/tephra-core/src/main/java/co/cask/tephra/distributed/AbstractClientProvider.java
deleted file mode 100644
index d0a3603..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/AbstractClientProvider.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import co.cask.tephra.TxConstants;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.twill.discovery.Discoverable;
-import org.apache.twill.discovery.DiscoveryServiceClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * An abstract tx client provider that implements common functionality.
- */
-public abstract class AbstractClientProvider implements ThriftClientProvider {
-
- private static final Logger LOG = LoggerFactory.getLogger(AbstractClientProvider.class);
-
- // Discovery service. If null, no service discovery.
- private final DiscoveryServiceClient discoveryServiceClient;
- protected final AtomicBoolean initialized = new AtomicBoolean(false);
-
- // the configuration
- final Configuration configuration;
-
- // the endpoint strategy for service discovery.
- EndpointStrategy endpointStrategy;
-
- protected AbstractClientProvider(Configuration configuration, DiscoveryServiceClient discoveryServiceClient) {
- this.configuration = configuration;
- this.discoveryServiceClient = discoveryServiceClient;
- }
-
- public void initialize() throws TException {
- if (initialized.compareAndSet(false, true)) {
- this.initDiscovery();
- }
- }
-
- /**
- * Initialize the service discovery client, we will reuse that
- * every time we need to create a new client.
- */
- private void initDiscovery() {
- if (discoveryServiceClient == null) {
- LOG.info("No DiscoveryServiceClient provided. Skipping service discovery.");
- return;
- }
-
- endpointStrategy = new TimeLimitEndpointStrategy(
- new RandomEndpointStrategy(
- discoveryServiceClient.discover(
- configuration.get(TxConstants.Service.CFG_DATA_TX_DISCOVERY_SERVICE_NAME,
- TxConstants.Service.DEFAULT_DATA_TX_DISCOVERY_SERVICE_NAME))),
- 2, TimeUnit.SECONDS);
- }
-
- protected TransactionServiceThriftClient newClient() throws TException {
- return newClient(-1);
- }
-
- protected TransactionServiceThriftClient newClient(int timeout) throws TException {
- initialize();
- String address;
- int port;
-
- if (endpointStrategy == null) {
- // if there is no discovery service, try to read host and port directly
- // from the configuration
- LOG.info("Reading address and port from configuration.");
- address = configuration.get(TxConstants.Service.CFG_DATA_TX_BIND_ADDRESS,
- TxConstants.Service.DEFAULT_DATA_TX_BIND_ADDRESS);
- port = configuration.getInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT,
- TxConstants.Service.DEFAULT_DATA_TX_BIND_PORT);
- LOG.info("Service assumed at " + address + ":" + port);
- } else {
- Discoverable endpoint = endpointStrategy.pick();
- if (endpoint == null) {
- LOG.error("Unable to discover tx service.");
- throw new TException("Unable to discover tx service.");
- }
- address = endpoint.getSocketAddress().getHostName();
- port = endpoint.getSocketAddress().getPort();
- LOG.info("Service discovered at " + address + ":" + port);
- }
-
- // now we have an address and port, try to connect a client
- if (timeout < 0) {
- timeout = configuration.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_TIMEOUT,
- TxConstants.Service.DEFAULT_DATA_TX_CLIENT_TIMEOUT_MS);
- }
- LOG.info("Attempting to connect to tx service at " +
- address + ":" + port + " with timeout " + timeout + " ms.");
- // thrift transport layer
- TTransport transport =
- new TFramedTransport(new TSocket(address, port, timeout));
- try {
- transport.open();
- } catch (TTransportException e) {
- LOG.error("Unable to connect to tx service: " + e.getMessage());
- throw e;
- }
- // and create a thrift client
- TransactionServiceThriftClient newClient = new TransactionServiceThriftClient(transport);
-
- LOG.info("Connected to tx service at " +
- address + ":" + port);
- return newClient;
- }
-
- /**
- * This class helps picking up an endpoint from a list of Discoverable.
- */
- public interface EndpointStrategy {
-
- /**
- * Picks a {@link Discoverable} using its strategy.
- * @return A {@link Discoverable} based on the stragegy or {@code null} if no endpoint can be found.
- */
- Discoverable pick();
- }
-
- /**
- * An {@link EndpointStrategy} that make sure it picks an endpoint within the given
- * timeout limit.
- */
- public final class TimeLimitEndpointStrategy implements EndpointStrategy {
-
- private final EndpointStrategy delegate;
- private final long timeout;
- private final TimeUnit timeoutUnit;
-
- public TimeLimitEndpointStrategy(EndpointStrategy delegate, long timeout, TimeUnit timeoutUnit) {
- this.delegate = delegate;
- this.timeout = timeout;
- this.timeoutUnit = timeoutUnit;
- }
-
- @Override
- public Discoverable pick() {
- Discoverable pick = delegate.pick();
- try {
- long count = 0;
- while (pick == null && count++ < timeout) {
- timeoutUnit.sleep(1);
- pick = delegate.pick();
- }
- } catch (InterruptedException e) {
- // Simply propagate the interrupt.
- Thread.currentThread().interrupt();
- }
- return pick;
- }
- }
-
- /**
- * Randomly picks endpoint from the list of available endpoints.
- */
- public final class RandomEndpointStrategy implements EndpointStrategy {
-
- private final Iterable<Discoverable> endpoints;
-
- /**
- * Constructs a random endpoint strategy.
- * @param endpoints Endpoints for the strategy to use. Note that this strategy will
- * invoke {@link Iterable#iterator()} and traverse through it on
- * every call to the {@link #pick()} method. One could leverage this
- * behavior with the live {@link Iterable} as provided by
- * {@link org.apache.twill.discovery.DiscoveryServiceClient#discover(String)} method.
- */
- public RandomEndpointStrategy(Iterable<Discoverable> endpoints) {
- this.endpoints = endpoints;
- }
-
- @Override
- public Discoverable pick() {
- // Reservoir sampling
- Discoverable result = null;
- Iterator<Discoverable> itor = endpoints.iterator();
- Random random = new Random();
- int count = 0;
- while (itor.hasNext()) {
- Discoverable next = itor.next();
- if (random.nextInt(++count) == 0) {
- result = next;
- }
- }
- return result;
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/CloseableThriftClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/CloseableThriftClient.java b/tephra-core/src/main/java/co/cask/tephra/distributed/CloseableThriftClient.java
deleted file mode 100644
index ba6da24..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/CloseableThriftClient.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-/**
- * An {@link AutoCloseable} to automatically return the thrift client to the ThriftClientProvider.
- */
-public class CloseableThriftClient implements AutoCloseable {
-
- private final ThriftClientProvider provider;
- private final TransactionServiceThriftClient thriftClient;
-
- public CloseableThriftClient(ThriftClientProvider provider, TransactionServiceThriftClient thriftClient) {
- this.provider = provider;
- this.thriftClient = thriftClient;
- }
-
- public TransactionServiceThriftClient getThriftClient() {
- return thriftClient;
- }
-
- @Override
- public void close() {
- // in any case, the client must be returned to the pool. The pool is
- // responsible for discarding the client if it is in a bad state.
- provider.returnClient(thriftClient);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/ElasticPool.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/ElasticPool.java b/tephra-core/src/main/java/co/cask/tephra/distributed/ElasticPool.java
deleted file mode 100644
index 843ab90..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/ElasticPool.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * An Elastic Pool is an object pool that can dynamically grow.
- * Normally, an element is obtained by a client and then returned to the pool
- * after use. However, if the element gets into a bad state, the element can
- * be discarded, based upon the recycle() method returning false. This will
- * cause the element to be removed from the pool, and for a subsequent request,
- * a new element can be created on the fly to replace the discarded one.
- *
- * The pool starts with zero (active) elements. Every time a client attempts
- * to obtain an element, an element from the pool is returned if available.
- * Otherwise, if the number of active elements is less than the pool's limit,
- * a new element is created (using abstract method create(), this must be
- * overridden by all implementations), and the number of active elements is
- * increased by one. If the limit is reached, then obtain() blocks until
- * either an element is returned to the pool or, if the obtain method with timeout
- * parameters is used, a timeout occurs.
- *
- * Every time an element is returned to the pool, it is "recycled" to restore its
- * fresh state for the next use or destroyed, depending on its state.
- *
- * @param <T> the type of the elements
- * @param <E> the type of exception thrown by create()
- */
-public abstract class ElasticPool<T, E extends Exception> {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(ElasticPool.class);
-
- /**
- * A method to create a new element. Will be called every time the pool
- * of available elements is empty but the limit of active elements is
- * not exceeded.
- * @return a new element
- */
- protected abstract T create() throws E;
-
- /**
- * A method to recycle an existing element when it is returned to the pool.
- * This methods ensures that the element is in a fresh state before it can
- * be reused by the next agent. If the element is not to be returned to the pool,
- * calling code is responsible for destroying it and returning false.
- *
- * @param element the element to recycle
- * @return true to reuse element, false to remove it from the pool
- */
- protected boolean recycle(T element) {
- // by default, simply return true
- return true;
- }
-
- // holds all currently available elements
- private final ConcurrentLinkedQueue<T> elements;
-
- // we keep track of elements via the permits of a semaphore, because there can
- // be elements in a queue, but also elements that are "loaned out" count towards
- // the pool's size limit
- private final Semaphore semaphore;
-
- public ElasticPool(int sizeLimit) {
- elements = new ConcurrentLinkedQueue<>();
- semaphore = new Semaphore(sizeLimit, true);
- }
-
- /**
- * Get a element from the pool. If there is an available element in
- * the pool, it will be returned. Otherwise, if the number of active
- * elements does not exceed the limit, a new element is created with
- * create() and returned. Otherwise, blocks until an element is either
- * released and returned to the pool, or an element is discarded,
- * allowing for the creation of a new element.
- *
- * @return an element
- */
- public T obtain() throws E, InterruptedException {
- semaphore.acquire();
- return getOrCreate();
- }
-
- /**
- * Get a element from the pool. If there is an available element in
- * the pool, it will be returned. Otherwise, if the number of active
- * elements does ot exceed the limit, a new element is created with
- * create() and returned. Otherwise, blocks until an element is either
- * released and returned to the pool, an element is discarded,
- * allowing for the creation of a new element, or a timeout occurs.
- *
- * @param timeout the timeout for trying to obtain an element
- * @param unit the timeout unit for trying to obtain an element
- * @return an element
- * @throws TimeoutException if a client is not able to be obtained within the given timeout
- */
- public T obtain(long timeout, TimeUnit unit) throws E, TimeoutException, InterruptedException {
- if (!semaphore.tryAcquire(1, timeout, unit)) {
- throw new TimeoutException(String.format("Failed to obtain client within %d %s.",
- timeout, unit));
- }
- return getOrCreate();
- }
-
- // gets an element from the queue, or creates one if there is none available in the queue.
- // the semaphore must be acquired before calling this method. The semaphore will be released from within
- // this method if it throws any exception
- private T getOrCreate() throws E {
- try {
- T client = elements.poll();
- // a client was available, all good. otherwise, create one
- if (client != null) {
- return client;
- }
- return create();
- } catch (Exception e) {
- // if an exception is thrown after acquiring the semaphore, release the
- // semaphore before propagating the exception
- semaphore.release();
- throw e;
- }
- }
-
- /**
- * Returns an element to the pool of available elements. The element must
- * have been obtained from this pool through obtain(). The recycle() method
- * is called before the element is available for obtain().
- *
- * @param element the element to be returned
- */
- public void release(T element) {
- try {
- if (recycle(element)) {
- elements.add(element);
- }
- } finally {
- semaphore.release();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/PooledClientProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/PooledClientProvider.java b/tephra-core/src/main/java/co/cask/tephra/distributed/PooledClientProvider.java
deleted file mode 100644
index 3394811..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/PooledClientProvider.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import co.cask.tephra.TxConstants;
-import com.google.common.base.Throwables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.thrift.TException;
-import org.apache.twill.discovery.DiscoveryServiceClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * This is an tx client provider that uses a bounded size pool of connections.
- */
-public class PooledClientProvider extends AbstractClientProvider {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(PooledClientProvider.class);
-
- // we will use this as a pool of tx clients
- class TxClientPool extends ElasticPool<TransactionServiceThriftClient, TException> {
- TxClientPool(int sizeLimit) {
- super(sizeLimit);
- }
-
- @Override
- protected TransactionServiceThriftClient create() throws TException {
- return newClient();
- }
-
- @Override
- protected boolean recycle(TransactionServiceThriftClient client) {
- if (!client.isValid()) {
- client.close();
- return false;
- }
- return true;
- }
- }
-
- // we will use this as a pool of tx clients
- private volatile TxClientPool clients;
-
- // the limit for the number of active clients
- private int maxClients;
- // timeout, for obtaining a client
- private long obtainClientTimeoutMs;
-
- public PooledClientProvider(Configuration conf, DiscoveryServiceClient discoveryServiceClient) {
- super(conf, discoveryServiceClient);
- }
-
- private void initializePool() throws TException {
- // initialize the super class (needed for service discovery)
- super.initialize();
-
- // create a (empty) pool of tx clients
- maxClients = configuration.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_COUNT,
- TxConstants.Service.DEFAULT_DATA_TX_CLIENT_COUNT);
- if (maxClients < 1) {
- LOG.warn("Configuration of " + TxConstants.Service.CFG_DATA_TX_CLIENT_COUNT +
- " is invalid: value is " + maxClients + " but must be at least 1. " +
- "Using 1 as a fallback. ");
- maxClients = 1;
- }
-
- obtainClientTimeoutMs =
- configuration.getLong(TxConstants.Service.CFG_DATA_TX_CLIENT_OBTAIN_TIMEOUT_MS,
- TxConstants.Service.DEFAULT_DATA_TX_CLIENT_OBTAIN_TIMEOUT_MS);
- if (obtainClientTimeoutMs < 0) {
- LOG.warn("Configuration of " + TxConstants.Service.CFG_DATA_TX_CLIENT_COUNT +
- " is invalid: value is " + obtainClientTimeoutMs + " but must be at least 0. " +
- "Using 0 as a fallback. ");
- obtainClientTimeoutMs = 0;
- }
- this.clients = new TxClientPool(maxClients);
- }
-
- @Override
- public CloseableThriftClient getCloseableClient() throws TException, TimeoutException, InterruptedException {
- TransactionServiceThriftClient client = getClientPool().obtain(obtainClientTimeoutMs, TimeUnit.MILLISECONDS);
- return new CloseableThriftClient(this, client);
- }
-
- @Override
- public void returnClient(TransactionServiceThriftClient client) {
- getClientPool().release(client);
- }
-
- @Override
- public String toString() {
- return "Elastic pool of size " + this.maxClients +
- ", with timeout (in milliseconds): " + this.obtainClientTimeoutMs;
- }
-
- private TxClientPool getClientPool() {
- if (clients != null) {
- return clients;
- }
-
- synchronized (this) {
- if (clients == null) {
- try {
- initializePool();
- } catch (TException e) {
- LOG.error("Failed to initialize Tx client provider", e);
- throw Throwables.propagate(e);
- }
- }
- }
- return clients;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/RetryNTimes.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryNTimes.java b/tephra-core/src/main/java/co/cask/tephra/distributed/RetryNTimes.java
deleted file mode 100644
index 22a4015..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryNTimes.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import co.cask.tephra.TxConstants;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * A retry strategy that makes N attempts and then gives up. This does
- * not do anything before the re-attempt - extend this class to add a
- * sleep or similar.
- */
-public class RetryNTimes extends RetryStrategy {
-
- int attempts = 0;
- int limit;
-
- /**
- * @param maxAttempts the number of attempts after which to stop
- */
- protected RetryNTimes(int maxAttempts) {
- limit = maxAttempts;
- }
-
- @Override
- boolean failOnce() {
- ++attempts;
- return attempts < limit;
- }
-
- /**
- * A retry strategy provider for this strategy.
- */
- public static class Provider implements RetryStrategyProvider {
-
- int nTimes;
-
- public Provider() {
- this.nTimes = TxConstants.Service.DEFAULT_DATA_TX_CLIENT_ATTEMPTS;
- }
-
- @Override
- public void configure(Configuration config) {
- nTimes = config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, nTimes);
- }
-
- @Override
- public RetryStrategy newRetryStrategy() {
- return new RetryNTimes(nTimes);
- }
-
- @Override
- public String toString() {
- return nTimes + " attempts without delay";
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategy.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategy.java b/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategy.java
deleted file mode 100644
index 80ae779..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategy.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-/**
- * A retry strategy is an abstraction over how the remote tx client shuold retry operations after connection
- * failures.
- */
-public abstract class RetryStrategy {
-
- /**
- * Increments the number of failed attempts.
- * @return whether another attempt should be made
- */
- abstract boolean failOnce();
-
- /**
- * Should be called before re-attempting. This can, for instance
- * inject a sleep time between retries. Default implementation is
- * to do nothing.
- */
- void beforeRetry() {
- // do nothinhg
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategyProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategyProvider.java b/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategyProvider.java
deleted file mode 100644
index 3ea2e94..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategyProvider.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * A retry strategy provider is used by the tx client to get a new retry strategy for every call.
- */
-public interface RetryStrategyProvider {
-
- /**
- * Provides a new instance of a retry strategy.
- * @return a retry strategy
- */
- RetryStrategy newRetryStrategy();
-
- /**
- * Configure the strategy.
- * @param config the configuration
- */
- void configure(Configuration config);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/RetryWithBackoff.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryWithBackoff.java b/tephra-core/src/main/java/co/cask/tephra/distributed/RetryWithBackoff.java
deleted file mode 100644
index 3226ba7..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryWithBackoff.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import co.cask.tephra.TxConstants;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A retry strategy that makes N attempts and then gives up. This does
- * not do anything before the re-attempt - extend this class to add a
- * sleep or similar.
- */
-public class RetryWithBackoff extends RetryStrategy {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(RetryWithBackoff.class);
-
- int initialSleep; // initial sleep time
- int backoffFactor; // factor by which to increase sleep for each retry
- int maxSleep; // max sleep time. stop retrying when we exceed this
- int sleep; // current sleep time
-
- /**
- * @param initial the initial sleep time (before first retry)
- * @param backoff the backoff factor by which sleep time is multiplied
- * after each retry
- * @param limit the max sleep time. if sleep time reaches this limit, we
- * stop retrying
- */
- protected RetryWithBackoff(int initial, int backoff, int limit) {
- initialSleep = initial;
- backoffFactor = backoff;
- maxSleep = limit;
- sleep = initialSleep;
- }
-
- @Override
- boolean failOnce() {
- return sleep < maxSleep;
- }
-
- @Override
- void beforeRetry() {
- LOG.info("Sleeping " + sleep + " ms before retry.");
- long current = System.currentTimeMillis();
- long end = current + sleep;
- while (current < end) {
- try {
- Thread.sleep(end - current);
- } catch (InterruptedException e) {
- // do nothing
- }
- current = System.currentTimeMillis();
- }
- sleep = sleep * backoffFactor;
- }
-
- /**
- * A provider for this retry strategy.
- */
- public static class Provider implements RetryStrategyProvider {
-
- int initialSleep; // initial sleep time
- int backoffFactor; // factor by which to increase sleep for each retry
- int maxSleep; // max sleep time. stop retrying when we exceed this
-
- public Provider() {
- initialSleep = TxConstants.Service.DEFAULT_DATA_TX_CLIENT_BACKOFF_INITIAL;
- backoffFactor = TxConstants.Service.DEFAULT_DATA_TX_CLIENT_BACKOFF_FACTOR;
- maxSleep = TxConstants.Service.DEFAULT_DATA_TX_CLIENT_BACKOFF_LIMIT;
- }
-
- public void configure(Configuration config) {
- initialSleep = config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_BACKOFF_INITIAL, initialSleep);
- backoffFactor = config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_BACKOFF_FACTOR, backoffFactor);
- maxSleep = config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_BACKOFF_LIMIT, maxSleep);
- }
-
- @Override
- public RetryStrategy newRetryStrategy() {
- return new RetryWithBackoff(initialSleep, backoffFactor, maxSleep);
- }
-
- @Override
- public String toString() {
- return "sleep " + initialSleep + " ms with back off factor " +
- backoffFactor + " and limit " + maxSleep + " ms";
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/SingleUseClientProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/SingleUseClientProvider.java b/tephra-core/src/main/java/co/cask/tephra/distributed/SingleUseClientProvider.java
deleted file mode 100644
index 8673ca4..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/SingleUseClientProvider.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.thrift.TException;
-import org.apache.twill.discovery.DiscoveryServiceClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeoutException;
-
-/**
- * An tx client provider that creates a new connection every time.
- */
-public class SingleUseClientProvider extends AbstractClientProvider {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(SingleUseClientProvider.class);
-
- public SingleUseClientProvider(Configuration conf, DiscoveryServiceClient discoveryServiceClient, int timeout) {
- super(conf, discoveryServiceClient);
- this.timeout = timeout;
- }
-
- final int timeout;
-
- @Override
- public CloseableThriftClient getCloseableClient() throws TException, TimeoutException, InterruptedException {
- try {
- return new CloseableThriftClient(this, newClient(timeout));
- } catch (TException e) {
- LOG.error("Unable to create new tx client: " + e.getMessage());
- throw e;
- }
- }
-
- @Override
- public void returnClient(TransactionServiceThriftClient client) {
- client.close();
- }
-
- @Override
- public String toString() {
- return "Single-use(timeout = " + timeout + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/ThreadLocalClientProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/ThreadLocalClientProvider.java b/tephra-core/src/main/java/co/cask/tephra/distributed/ThreadLocalClientProvider.java
deleted file mode 100644
index eac75bd..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/ThreadLocalClientProvider.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.thrift.TException;
-import org.apache.twill.discovery.DiscoveryServiceClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeoutException;
-
-/**
- * An tx client provider that uses thread local to maintain at most one open connection per thread.
- * Note that there can be a connection leak if the threads are recycled.
- */
-public class ThreadLocalClientProvider extends AbstractClientProvider {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(ThreadLocalClientProvider.class);
-
- ThreadLocal<TransactionServiceThriftClient> clients = new ThreadLocal<>();
-
- public ThreadLocalClientProvider(Configuration conf, DiscoveryServiceClient discoveryServiceClient) {
- super(conf, discoveryServiceClient);
- }
-
- @Override
- public CloseableThriftClient getCloseableClient() throws TException, TimeoutException, InterruptedException {
- TransactionServiceThriftClient client = this.clients.get();
- if (client == null) {
- try {
- client = this.newClient();
- clients.set(client);
- } catch (TException e) {
- LOG.error("Unable to create new tx client for thread: "
- + e.getMessage());
- throw e;
- }
- }
- return new CloseableThriftClient(this, client);
- }
-
- @Override
- public void returnClient(TransactionServiceThriftClient client) {
- if (!client.isValid()) {
- client.close();
- clients.remove();
- }
- }
-
- @Override
- public String toString() {
- return "Thread-local";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/ThriftClientProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/ThriftClientProvider.java b/tephra-core/src/main/java/co/cask/tephra/distributed/ThriftClientProvider.java
deleted file mode 100644
index 1e86c08..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/ThriftClientProvider.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import org.apache.thrift.TException;
-
-import java.util.concurrent.TimeoutException;
-
-/**
- * This interface is used to provide thrift tx service clients:
- * there is only one (singleton)
- * tx service per JVM, but many threads may use it concurrently.
- * However, being a thrift client, it is not thread-safe. In
- * order to avoid serializing all tx calls by synchronizing
- * on the tx service client, we employ a pool of clients. But
- * in different scenarios there are different strategies for
- * pooling: If there are many short-lived threads, it is wise
- * to have a shared pool between all threads. But if there are
- * few long-lived threads, it may be better to have thread-local
- * client for each thread.
- *
- * This interface provides an abstraction of the pooling strategy.
- */
-public interface ThriftClientProvider {
-
- /**
- * Initialize the provider. At this point, it should be verified
- * that tx service is up and running and getClient() can
- * create new clients when necessary.
- */
- void initialize() throws TException;
-
- /**
- * Retrieve an AutoCloseable wrapper around tx client for exclusive use by the
- * current thread. The client must be closed (returned) to the provider after use.
- * @return an tx client, connected and fully functional
- */
- CloseableThriftClient getCloseableClient() throws TException,
- TimeoutException, InterruptedException;
-
- /**
- * Release an tx client back to the provider's pool, if the client is valid.
- * If the client becomes disfunctional, for instance, due to a socket
- * exception. The provider must make sure to close the client, and it
- * must remove the client from its arsenal and be prepared to create
- * a new client subsequently.
- *
- * @param client The client to return
- */
- void returnClient(TransactionServiceThriftClient client);
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionConverterUtils.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionConverterUtils.java b/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionConverterUtils.java
deleted file mode 100644
index bda39fd..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionConverterUtils.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionType;
-import co.cask.tephra.distributed.thrift.TTransaction;
-import co.cask.tephra.distributed.thrift.TTransactionType;
-import co.cask.tephra.distributed.thrift.TVisibilityLevel;
-import com.google.common.primitives.Longs;
-
-/**
- * Utility methods to convert to thrift and back.
- */
-public final class TransactionConverterUtils {
- private static final long[] EMPTY_LONG_ARRAY = {};
-
- public static TTransaction wrap(Transaction tx) {
- return new TTransaction(tx.getTransactionId(), tx.getReadPointer(),
- Longs.asList(tx.getInvalids()), Longs.asList(tx.getInProgress()),
- tx.getFirstShortInProgress(), getTTransactionType(tx.getType()),
- tx.getWritePointer(), Longs.asList(tx.getCheckpointWritePointers()),
- getTVisibilityLevel(tx.getVisibilityLevel()));
- }
-
- public static Transaction unwrap(TTransaction thriftTx) {
- return new Transaction(thriftTx.getReadPointer(), thriftTx.getTransactionId(), thriftTx.getWritePointer(),
- thriftTx.getInvalids() == null ? EMPTY_LONG_ARRAY : Longs.toArray(thriftTx.getInvalids()),
- thriftTx.getInProgress() == null ? EMPTY_LONG_ARRAY :
- Longs.toArray(thriftTx.getInProgress()),
- thriftTx.getFirstShort(), getTransactionType(thriftTx.getType()),
- thriftTx.getCheckpointWritePointers() == null ? EMPTY_LONG_ARRAY :
- Longs.toArray(thriftTx.getCheckpointWritePointers()),
- getVisibilityLevel(thriftTx.getVisibilityLevel()));
- }
-
- private static TransactionType getTransactionType(TTransactionType tType) {
- return tType == TTransactionType.SHORT ? TransactionType.SHORT : TransactionType.LONG;
- }
-
- private static TTransactionType getTTransactionType(TransactionType type) {
- return type == TransactionType.SHORT ? TTransactionType.SHORT : TTransactionType.LONG;
- }
-
- private static Transaction.VisibilityLevel getVisibilityLevel(TVisibilityLevel tLevel) {
- // default to SNAPSHOT
- if (tLevel == null) {
- return Transaction.VisibilityLevel.SNAPSHOT;
- }
-
- switch (tLevel) {
- case SNAPSHOT:
- return Transaction.VisibilityLevel.SNAPSHOT;
- case SNAPSHOT_EXCLUDE_CURRENT:
- return Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
- case SNAPSHOT_ALL:
- return Transaction.VisibilityLevel.SNAPSHOT_ALL;
- default:
- throw new IllegalArgumentException("Unknown TVisibilityLevel: " + tLevel);
- }
- }
-
- private static TVisibilityLevel getTVisibilityLevel(Transaction.VisibilityLevel level) {
- switch (level) {
- case SNAPSHOT:
- return TVisibilityLevel.SNAPSHOT;
- case SNAPSHOT_EXCLUDE_CURRENT:
- return TVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
- case SNAPSHOT_ALL:
- return TVisibilityLevel.SNAPSHOT_ALL;
- default:
- throw new IllegalArgumentException("Unknown VisibilityLevel: " + level);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionService.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionService.java b/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionService.java
deleted file mode 100644
index 22e986d..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionService.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.distributed.thrift.TTransactionServer;
-import co.cask.tephra.inmemory.InMemoryTransactionService;
-import co.cask.tephra.rpc.ThriftRPCServer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.twill.api.ElectionHandler;
-import org.apache.twill.discovery.DiscoveryService;
-import org.apache.twill.internal.ServiceListenerAdapter;
-import org.apache.twill.internal.zookeeper.LeaderElection;
-import org.apache.twill.zookeeper.ZKClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- *
- */
-public final class TransactionService extends InMemoryTransactionService {
- private static final Logger LOG = LoggerFactory.getLogger(TransactionService.class);
- private LeaderElection leaderElection;
- private final ZKClient zkClient;
-
- private ThriftRPCServer<TransactionServiceThriftHandler, TTransactionServer> server;
-
- @Inject
- public TransactionService(Configuration conf,
- ZKClient zkClient,
- DiscoveryService discoveryService,
- Provider<TransactionManager> txManagerProvider) {
- super(conf, discoveryService, txManagerProvider);
- this.zkClient = zkClient;
- }
-
- @Override
- protected InetSocketAddress getAddress() {
- if (address.equals("0.0.0.0")) {
- // resolve hostname
- try {
- return new InetSocketAddress(InetAddress.getLocalHost().getHostName(), server.getBindAddress().getPort());
- } catch (UnknownHostException x) {
- LOG.error("Cannot resolve hostname for 0.0.0.0", x);
- }
- }
- return server.getBindAddress();
- }
-
- @Override
- protected void doStart() {
- leaderElection = new LeaderElection(zkClient, "/tx.service/leader", new ElectionHandler() {
- @Override
- public void leader() {
- // if the txManager fails, we should stop the server
- txManager = txManagerProvider.get();
- txManager.addListener(new ServiceListenerAdapter() {
- @Override
- public void failed(State from, Throwable failure) {
- LOG.error("Transaction manager aborted, stopping transaction service");
- TransactionService.this.abort(failure);
- }
- }, MoreExecutors.sameThreadExecutor());
-
- server = ThriftRPCServer.builder(TTransactionServer.class)
- .setHost(address)
- .setPort(port)
- .setWorkerThreads(threads)
- .setMaxReadBufferBytes(maxReadBufferBytes)
- .setIOThreads(ioThreads)
- .build(new TransactionServiceThriftHandler(txManager));
- try {
- server.startAndWait();
- doRegister();
- LOG.info("Transaction Thrift Service started successfully on " + getAddress());
- } catch (Throwable t) {
- LOG.info("Transaction Thrift Service didn't start on " + server.getBindAddress());
- leaderElection.stop();
- notifyFailed(t);
- }
- }
-
- @Override
- public void follower() {
- // First stop the transaction server as un-registering from discovery can block sometimes.
- // That can lead to multiple transaction servers being active at the same time.
- if (server != null && server.isRunning()) {
- server.stopAndWait();
- }
- undoRegister();
- }
- });
- leaderElection.start();
-
- notifyStarted();
- }
-
- @VisibleForTesting
- State thriftRPCServerState() {
- return server.state();
- }
-
- @Override
- protected void doStop() {
- internalStop();
- notifyStopped();
- }
-
- protected void abort(Throwable cause) {
- // try to clear leader status and shutdown RPC
- internalStop();
- notifyFailed(cause);
- }
-
- protected void internalStop() {
- if (leaderElection != null) {
- // NOTE: if was a leader this will cause loosing of leadership which in callback above will
- // de-register service in discovery service and stop the service if needed
- try {
- Uninterruptibles.getUninterruptibly(leaderElection.stop(), 5, TimeUnit.SECONDS);
- } catch (TimeoutException te) {
- LOG.warn("Timed out waiting for leader election cancellation to complete");
- } catch (ExecutionException e) {
- LOG.error("Exception when cancelling leader election.", e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceClient.java b/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceClient.java
deleted file mode 100644
index 4f29730..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceClient.java
+++ /dev/null
@@ -1,473 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import co.cask.tephra.InvalidTruncateTimeException;
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionCouldNotTakeSnapshotException;
-import co.cask.tephra.TransactionNotInProgressException;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.runtime.ConfigModule;
-import co.cask.tephra.runtime.DiscoveryModules;
-import co.cask.tephra.runtime.TransactionClientModule;
-import co.cask.tephra.runtime.TransactionModules;
-import co.cask.tephra.runtime.ZKModule;
-import co.cask.tephra.util.ConfigurationFactory;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
-import com.google.inject.Guice;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.thrift.TException;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Set;
-
-/**
- * A tx service client
- */
-public class TransactionServiceClient implements TransactionSystemClient {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(TransactionServiceClient.class);
-
- // we will use this to provide every call with an tx client
- private ThriftClientProvider clientProvider;
-
- // the retry strategy we will use
- private final RetryStrategyProvider retryStrategyProvider;
-
- /**
- * Utility to be used for basic verification of transaction system availability and functioning
- * @param args arguments list, accepts single option "-v" that makes it to print out more details about started tx
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- if (args.length > 1 || (args.length == 1 && !"-v".equals(args[0]))) {
- System.out.println("USAGE: TransactionServiceClient [-v]");
- }
-
- boolean verbose = false;
- if (args.length == 1 && "-v".equals(args[0])) {
- verbose = true;
- }
- doMain(verbose, new ConfigurationFactory().get());
- }
-
- @VisibleForTesting
- public static void doMain(boolean verbose, Configuration conf) throws Exception {
- LOG.info("Starting tx server client test.");
- Injector injector = Guice.createInjector(
- new ConfigModule(conf),
- new ZKModule(),
- new DiscoveryModules().getDistributedModules(),
- new TransactionModules().getDistributedModules(),
- new TransactionClientModule()
- );
-
- ZKClientService zkClient = injector.getInstance(ZKClientService.class);
- zkClient.startAndWait();
-
- try {
- TransactionServiceClient client = injector.getInstance(TransactionServiceClient.class);
- LOG.info("Starting tx...");
- Transaction tx = client.startShort();
- if (verbose) {
- LOG.info("Started tx details: " + tx.toString());
- } else {
- LOG.info("Started tx: " + tx.getTransactionId() +
- ", readPointer: " + tx.getReadPointer() +
- ", invalids: " + tx.getInvalids().length +
- ", inProgress: " + tx.getInProgress().length);
- }
- LOG.info("Checking if canCommit tx...");
- boolean canCommit = client.canCommit(tx, Collections.<byte[]>emptyList());
- LOG.info("canCommit: " + canCommit);
- if (canCommit) {
- LOG.info("Committing tx...");
- boolean committed = client.commit(tx);
- LOG.info("Committed tx: " + committed);
- if (!committed) {
- LOG.info("Aborting tx...");
- client.abort(tx);
- LOG.info("Aborted tx...");
- }
- } else {
- LOG.info("Aborting tx...");
- client.abort(tx);
- LOG.info("Aborted tx...");
- }
- } finally {
- zkClient.stopAndWait();
- }
- }
-
- /**
- * Create from a configuration. This will first attempt to find a zookeeper
- * for service discovery. Otherwise it will look for the port in the
- * config and use localhost.
- * @param config a configuration containing the zookeeper properties
- */
- @Inject
- public TransactionServiceClient(Configuration config,
- ThriftClientProvider clientProvider) {
-
- // initialize the retry logic
- String retryStrat = config.get(
- TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY,
- TxConstants.Service.DEFAULT_DATA_TX_CLIENT_RETRY_STRATEGY);
- if ("backoff".equals(retryStrat)) {
- this.retryStrategyProvider = new RetryWithBackoff.Provider();
- } else if ("n-times".equals(retryStrat)) {
- this.retryStrategyProvider = new RetryNTimes.Provider();
- } else {
- String message = "Unknown Retry Strategy '" + retryStrat + "'.";
- LOG.error(message);
- throw new IllegalArgumentException(message);
- }
- this.retryStrategyProvider.configure(config);
- LOG.debug("Retry strategy is " + this.retryStrategyProvider);
-
- this.clientProvider = clientProvider;
- }
-
- /**
- * This is an abstract class that encapsulates an operation. It provides a
- * method to attempt the actual operation, and it can throw an operation
- * exception.
- * @param <T> The return type of the operation
- */
- abstract static class Operation<T> {
-
- /** the name of the operation. */
- String name;
-
- /** constructor with name of operation. */
- Operation(String name) {
- this.name = name;
- }
-
- /** return the name of the operation. */
- String getName() {
- return name;
- }
-
- /** execute the operation, given an tx client. */
- abstract T execute(TransactionServiceThriftClient client)
- throws Exception;
- }
-
- /** see execute(operation, client). */
- private <T> T execute(Operation<T> operation) throws Exception {
- return execute(operation, null);
- }
-
- /**
- * This is a generic method implementing the somewhat complex execution
- * and retry logic for operations, to avoid repetitive code.
- *
- * Attempts to execute one operation, by obtaining an tx client from
- * the client provider and passing the operation to the client. If the
- * call fails with a Thrift exception, apply the retry strategy. If no
- * more retries are to be made according to the strategy, call the
- * operation's error method to obtain a value to return. Note that error()
- * may also throw an exception. Note also that the retry logic is only
- * applied for thrift exceptions.
- *
- * @param operation The operation to be executed
- * @param provider An tx client provider. If null, then a client will be
- * obtained using the client provider
- * @param <T> The return type of the operation
- * @return the result of the operation, or a value returned by error()
- */
- private <T> T execute(Operation<T> operation, ThriftClientProvider provider) throws Exception {
- RetryStrategy retryStrategy = retryStrategyProvider.newRetryStrategy();
- while (true) {
- // did we get a custom client provider or do we use the default?
- if (provider == null) {
- provider = this.clientProvider;
- }
- // this will throw a TException if it cannot get a client
- try (CloseableThriftClient closeable = provider.getCloseableClient()) {
- // note that this can throw exceptions other than TException
- return operation.execute(closeable.getThriftClient());
-
- } catch (TException te) {
- // determine whether we should retry
- boolean retry = retryStrategy.failOnce();
- if (!retry) {
- // retry strategy is exceeded, throw an operation exception
- String message =
- "Thrift error for " + operation + ": " + te.getMessage();
- LOG.error(message);
- LOG.debug(message, te);
- throw new Exception(message, te);
- } else {
- // call retry strategy before retrying
- retryStrategy.beforeRetry();
- String msg = "Retrying " + operation.getName() + " after Thrift error: " + te.getMessage();
- LOG.info(msg);
- LOG.debug(msg, te);
- }
-
- }
- }
- }
-
- @Override
- public Transaction startLong() {
- try {
- return execute(
- new Operation<Transaction>("startLong") {
- @Override
- public Transaction execute(TransactionServiceThriftClient client)
- throws TException {
- return client.startLong();
- }
- });
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public Transaction startShort() {
- try {
- return execute(
- new Operation<Transaction>("startShort") {
- @Override
- public Transaction execute(TransactionServiceThriftClient client)
- throws TException {
- return client.startShort();
- }
- });
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public Transaction startShort(final int timeout) {
- try {
- return execute(
- new Operation<Transaction>("startShort") {
- @Override
- public Transaction execute(TransactionServiceThriftClient client)
- throws TException {
- return client.startShort(timeout);
- }
- });
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public boolean canCommit(final Transaction tx, final Collection<byte[]> changeIds)
- throws TransactionNotInProgressException {
-
- try {
- return execute(
- new Operation<Boolean>("canCommit") {
- @Override
- public Boolean execute(TransactionServiceThriftClient client)
- throws Exception {
- return client.canCommit(tx, changeIds);
- }
- });
- } catch (TransactionNotInProgressException e) {
- throw e;
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public boolean commit(final Transaction tx) throws TransactionNotInProgressException {
- try {
- return this.execute(
- new Operation<Boolean>("commit") {
- @Override
- public Boolean execute(TransactionServiceThriftClient client)
- throws Exception {
- return client.commit(tx);
- }
- });
- } catch (TransactionNotInProgressException e) {
- throw e;
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public void abort(final Transaction tx) {
- try {
- this.execute(
- new Operation<Boolean>("abort") {
- @Override
- public Boolean execute(TransactionServiceThriftClient client)
- throws TException {
- client.abort(tx);
- return true;
- }
- });
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public boolean invalidate(final long tx) {
- try {
- return this.execute(
- new Operation<Boolean>("invalidate") {
- @Override
- public Boolean execute(TransactionServiceThriftClient client)
- throws TException {
- return client.invalidate(tx);
- }
- });
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public Transaction checkpoint(final Transaction tx) throws TransactionNotInProgressException {
- try {
- return this.execute(
- new Operation<Transaction>("checkpoint") {
- @Override
- Transaction execute(TransactionServiceThriftClient client) throws Exception {
- return client.checkpoint(tx);
- }
- }
- );
- } catch (TransactionNotInProgressException e) {
- throw e;
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException {
- try {
- return this.execute(
- new Operation<InputStream>("takeSnapshot") {
- @Override
- public InputStream execute(TransactionServiceThriftClient client)
- throws Exception {
- return client.getSnapshotStream();
- }
- });
- } catch (TransactionCouldNotTakeSnapshotException e) {
- throw e;
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public String status() {
- try {
- return this.execute(
- new Operation<String>("status") {
- @Override
- public String execute(TransactionServiceThriftClient client) throws Exception {
- return client.status();
- }
- });
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public void resetState() {
- try {
- this.execute(
- new Operation<Boolean>("resetState") {
- @Override
- public Boolean execute(TransactionServiceThriftClient client)
- throws TException {
- client.resetState();
- return true;
- }
- });
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public boolean truncateInvalidTx(final Set<Long> invalidTxIds) {
- try {
- return this.execute(
- new Operation<Boolean>("truncateInvalidTx") {
- @Override
- public Boolean execute(TransactionServiceThriftClient client) throws TException {
- return client.truncateInvalidTx(invalidTxIds);
- }
- });
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public boolean truncateInvalidTxBefore(final long time) throws InvalidTruncateTimeException {
- try {
- return this.execute(
- new Operation<Boolean>("truncateInvalidTxBefore") {
- @Override
- public Boolean execute(TransactionServiceThriftClient client) throws Exception {
- return client.truncateInvalidTxBefore(time);
- }
- });
- } catch (InvalidTruncateTimeException e) {
- throw e;
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-
- @Override
- public int getInvalidSize() {
- try {
- return this.execute(
- new Operation<Integer>("getInvalidSize") {
- @Override
- public Integer execute(TransactionServiceThriftClient client) throws TException {
- return client.getInvalidSize();
- }
- });
- } catch (Exception e) {
- throw Throwables.propagate(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceThriftClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceThriftClient.java b/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceThriftClient.java
deleted file mode 100644
index 01f9b4c..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceThriftClient.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import co.cask.tephra.InvalidTruncateTimeException;
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionCouldNotTakeSnapshotException;
-import co.cask.tephra.TransactionNotInProgressException;
-import co.cask.tephra.distributed.thrift.TInvalidTruncateTimeException;
-import co.cask.tephra.distributed.thrift.TTransactionCouldNotTakeSnapshotException;
-import co.cask.tephra.distributed.thrift.TTransactionNotInProgressException;
-import co.cask.tephra.distributed.thrift.TTransactionServer;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * This class is a wrapper around the thrift tx service client, it takes
- * Operations, converts them into thrift objects, calls the thrift
- * client, and converts the results back to data fabric classes.
- * This class also instruments the thrift calls with metrics.
- */
-public class TransactionServiceThriftClient {
- private static final Function<byte[], ByteBuffer> BYTES_WRAPPER = new Function<byte[], ByteBuffer>() {
- @Override
- public ByteBuffer apply(byte[] input) {
- return ByteBuffer.wrap(input);
- }
- };
-
- /**
- * The thrift transport layer. We need this when we close the connection.
- */
- TTransport transport;
-
- /**
- * The actual thrift client.
- */
- TTransactionServer.Client client;
-
- /**
- * Whether this client is valid for use.
- */
- private final AtomicBoolean isValid = new AtomicBoolean(true);
-
- /**
- * Constructor from an existing, connected thrift transport.
- *
- * @param transport the thrift transport layer. It must already be connected
- */
- public TransactionServiceThriftClient(TTransport transport) {
- this.transport = transport;
- // thrift protocol layer, we use binary because so does the service
- TProtocol protocol = new TBinaryProtocol(transport);
- // and create a thrift client
- this.client = new TTransactionServer.Client(protocol);
- }
-
- /**
- * close this client. may be called multiple times
- */
- public void close() {
- if (this.transport.isOpen()) {
- this.transport.close();
- }
- }
-
- public Transaction startLong() throws TException {
- try {
- return TransactionConverterUtils.unwrap(client.startLong());
- } catch (TException e) {
- isValid.set(false);
- throw e;
- }
- }
-
- public Transaction startShort() throws TException {
- try {
- return TransactionConverterUtils.unwrap(client.startShort());
- } catch (TException e) {
- isValid.set(false);
- throw e;
- }
- }
-
- public Transaction startShort(int timeout) throws TException {
- try {
- return TransactionConverterUtils.unwrap(client.startShortTimeout(timeout));
- } catch (TException e) {
- isValid.set(false);
- throw e;
- }
- }
-
- public boolean canCommit(Transaction tx, Collection<byte[]> changeIds)
- throws TException, TransactionNotInProgressException {
- try {
- return client.canCommitTx(TransactionConverterUtils.wrap(tx),
- ImmutableSet.copyOf(Iterables.transform(changeIds, BYTES_WRAPPER))).isValue();
- } catch (TTransactionNotInProgressException e) {
- throw new TransactionNotInProgressException(e.getMessage());
- } catch (TException e) {
- isValid.set(false);
- throw e;
- }
- }
-
-
-
- public boolean commit(Transaction tx) throws TException, TransactionNotInProgressException {
- try {
- return client.commitTx(TransactionConverterUtils.wrap(tx)).isValue();
- } catch (TTransactionNotInProgressException e) {
- throw new TransactionNotInProgressException(e.getMessage());
- } catch (TException e) {
- isValid.set(false);
- throw e;
- }
- }
-
- public void abort(Transaction tx) throws TException {
- try {
- client.abortTx(TransactionConverterUtils.wrap(tx));
- } catch (TException e) {
- isValid.set(false);
- throw e;
- }
- }
-
- public boolean invalidate(long tx) throws TException {
- try {
- return client.invalidateTx(tx);
- } catch (TException e) {
- isValid.set(false);
- throw e;
- }
- }
-
- public Transaction checkpoint(Transaction tx) throws TException {
- try {
- return TransactionConverterUtils.unwrap(client.checkpoint(TransactionConverterUtils.wrap(tx)));
- } catch (TException e) {
- isValid.set(false);
- throw e;
- }
- }
-
- public InputStream getSnapshotStream() throws TException, TransactionCouldNotTakeSnapshotException {
- try {
- ByteBuffer buffer = client.getSnapshot();
- if (buffer.hasArray()) {
- return new ByteArrayInputStream(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
- }
-
- // The ByteBuffer is not backed by array. Read the content to a new byte array and return an InputStream of that.
- byte[] snapshot = new byte[buffer.remaining()];
- buffer.get(snapshot);
- return new ByteArrayInputStream(snapshot);
- } catch (TTransactionCouldNotTakeSnapshotException e) {
- throw new TransactionCouldNotTakeSnapshotException(e.getMessage());
- } catch (TException e) {
- isValid.set(false);
- throw e;
- }
- }
-
- public String status() throws TException {
- try {
- return client.status();
- } catch (TException e) {
- isValid.set(false);
- throw e;
- }
- }
-
- public void resetState() throws TException {
- try {
- client.resetState();
- } catch (TException e) {
- isValid.set(false);
- throw e;
- }
- }
-
- public boolean truncateInvalidTx(Set<Long> invalidTxIds) throws TException {
- try {
- return client.truncateInvalidTx(invalidTxIds).isValue();
- } catch (TException e) {
- isValid.set(false);
- throw e;
- }
- }
-
- public boolean truncateInvalidTxBefore(long time) throws TException, InvalidTruncateTimeException {
- try {
- return client.truncateInvalidTxBefore(time).isValue();
- } catch (TInvalidTruncateTimeException e) {
- throw new InvalidTruncateTimeException(e.getMessage());
- } catch (TException e) {
- isValid.set(false);
- throw e;
- }
- }
-
- public int getInvalidSize() throws TException {
- try {
- return client.invalidTxSize();
- } catch (TException e) {
- isValid.set(false);
- throw e;
- }
- }
-
- public boolean isValid() {
- return isValid.get();
- }
-}