You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/11 20:15:48 UTC

[48/56] [abbrv] [partial] incubator-tephra git commit: Rename package to org.apache.tephra

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCache.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCache.java b/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCache.java
deleted file mode 100644
index 2745c76..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCache.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.coprocessor;
-
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.persist.HDFSTransactionStateStorage;
-import co.cask.tephra.persist.TransactionStateStorage;
-import co.cask.tephra.persist.TransactionVisibilityState;
-import co.cask.tephra.snapshot.SnapshotCodecProvider;
-import co.cask.tephra.util.ConfigurationFactory;
-import com.google.common.util.concurrent.AbstractIdleService;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Periodically refreshes transaction state from the latest stored snapshot.  This is implemented as a singleton
- * to allow a single cache to be shared by all regions on a regionserver.
- */
-public class TransactionStateCache extends AbstractIdleService implements Configurable {
-  private static final Log LOG = LogFactory.getLog(TransactionStateCache.class);
-
-  // how frequently we should wake to check for changes (in seconds)
-  private static final long CHECK_FREQUENCY = 15;
-
-  private Configuration hConf;
-
-  private TransactionStateStorage storage;
-  private volatile TransactionVisibilityState latestState;
-
-  private Thread refreshService;
-  private long lastRefresh;
-  // snapshot refresh frequency in milliseconds
-  private long snapshotRefreshFrequency;
-  private boolean initialized;
-
-  public TransactionStateCache() {
-  }
-
-  @Override
-  public Configuration getConf() {
-    return hConf;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    this.hConf = conf;
-  }
-
-  @Override
-  protected void startUp() throws Exception {
-    refreshState();
-    startRefreshService();
-  }
-
-  @Override
-  protected void shutDown() throws Exception {
-    this.refreshService.interrupt();
-    this.storage.stop();
-  }
-
-  /**
-   * Try to initialize the Configuration and TransactionStateStorage instances.  Obtaining the Configuration may
-   * fail until ReactorServiceMain has been started.
-   */
-  private void tryInit() {
-    try {
-      Configuration conf = getSnapshotConfiguration();
-      if (conf != null) {
-        // Since this is only used for background loading of transaction snapshots, we use the no-op metrics collector,
-        // as there are no relevant metrics to report
-        this.storage = new HDFSTransactionStateStorage(conf, new SnapshotCodecProvider(conf),
-                                                       new TxMetricsCollector());
-        this.storage.startAndWait();
-        this.snapshotRefreshFrequency = conf.getLong(TxConstants.Manager.CFG_TX_SNAPSHOT_INTERVAL,
-                                                     TxConstants.Manager.DEFAULT_TX_SNAPSHOT_INTERVAL) * 1000;
-        this.initialized = true;
-      } else {
-        LOG.info("Could not load configuration");
-      }
-    } catch (Exception e) {
-      LOG.info("Failed to initialize TransactionStateCache due to: " + e.getMessage());
-    }
-  }
-
-  protected Configuration getSnapshotConfiguration() throws IOException {
-    Configuration conf = new ConfigurationFactory().get(hConf);
-    conf.unset(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES);
-    return conf;
-  }
-
-  private void reset() {
-    this.storage.stop();
-    this.lastRefresh = 0;
-    this.initialized = false;
-  }
-
-  private void startRefreshService() {
-    this.refreshService = new Thread("tx-state-refresh") {
-      @Override
-      public void run() {
-        while (!isInterrupted()) {
-          if (latestState == null || System.currentTimeMillis() > (lastRefresh + snapshotRefreshFrequency)) {
-            try {
-              refreshState();
-            } catch (IOException ioe) {
-              LOG.info("Error refreshing transaction state cache: " + ioe.getMessage());
-            }
-          }
-          try {
-            TimeUnit.SECONDS.sleep(CHECK_FREQUENCY);
-          } catch (InterruptedException ie) {
-            // reset status
-            interrupt();
-            break;
-          }
-        }
-        LOG.info("Exiting thread " + getName());
-      }
-    };
-    this.refreshService.setDaemon(true);
-    this.refreshService.start();
-  }
-
-  private void refreshState() throws IOException {
-    if (!initialized) {
-      tryInit();
-    }
-
-    // only continue if initialization was successful
-    if (initialized) {
-      long now = System.currentTimeMillis();
-      TransactionVisibilityState currentState = storage.getLatestTransactionVisibilityState();
-      if (currentState != null) {
-        if (currentState.getTimestamp() < (now - 2 * snapshotRefreshFrequency)) {
-          LOG.info("Current snapshot is old, will force a refresh on next run.");
-          reset();
-        } else {
-          latestState = currentState;
-          LOG.info("Transaction state reloaded with snapshot from " + latestState.getTimestamp());
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Latest transaction snapshot: " + latestState.toString());
-          }
-          lastRefresh = now;
-        }
-      } else {
-        LOG.info("No transaction state found.");
-      }
-    }
-  }
-
-  public TransactionVisibilityState getLatestState() {
-    return latestState;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCacheSupplier.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCacheSupplier.java b/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCacheSupplier.java
deleted file mode 100644
index 9f4a778..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/coprocessor/TransactionStateCacheSupplier.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.coprocessor;
-
-import com.google.common.base.Supplier;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Supplies instances of {@link TransactionStateCache} implementations.
- */
-public class TransactionStateCacheSupplier implements Supplier<TransactionStateCache> {
-  protected static volatile TransactionStateCache instance;
-  protected static Object lock = new Object();
-
-  protected final Configuration conf;
-
-  public TransactionStateCacheSupplier(Configuration conf) {
-    this.conf = conf;
-  }
-
-  /**
-   * Returns a singleton instance of the transaction state cache, performing lazy initialization if necessary.
-   * @return A shared instance of the transaction state cache.
-   */
-  @Override
-  public TransactionStateCache get() {
-    if (instance == null) {
-      synchronized (lock) {
-        if (instance == null) {
-          instance = new TransactionStateCache();
-          instance.setConf(conf);
-          instance.start();
-        }
-      }
-    }
-    return instance;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/coprocessor/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/coprocessor/package-info.java b/tephra-core/src/main/java/co/cask/tephra/coprocessor/package-info.java
deleted file mode 100644
index 5fa566c..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/coprocessor/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains HBase coprocessor implementations for the transaction system.
- */
-package co.cask.tephra.coprocessor;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/AbstractClientProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/AbstractClientProvider.java b/tephra-core/src/main/java/co/cask/tephra/distributed/AbstractClientProvider.java
deleted file mode 100644
index d0a3603..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/AbstractClientProvider.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import co.cask.tephra.TxConstants;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.twill.discovery.Discoverable;
-import org.apache.twill.discovery.DiscoveryServiceClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * An abstract tx client provider that implements common functionality.
- */
-public abstract class AbstractClientProvider implements ThriftClientProvider {
-
-  private static final Logger LOG = LoggerFactory.getLogger(AbstractClientProvider.class);
-
-  // Discovery service. If null, no service discovery.
-  private final DiscoveryServiceClient discoveryServiceClient;
-  protected final AtomicBoolean initialized = new AtomicBoolean(false);
-
-  // the configuration
-  final Configuration configuration;
-
-  // the endpoint strategy for service discovery.
-  EndpointStrategy endpointStrategy;
-
-  protected AbstractClientProvider(Configuration configuration, DiscoveryServiceClient discoveryServiceClient) {
-    this.configuration = configuration;
-    this.discoveryServiceClient = discoveryServiceClient;
-  }
-
-  public void initialize() throws TException {
-    if (initialized.compareAndSet(false, true)) {
-      this.initDiscovery();
-    }
-  }
-
-  /**
-   * Initialize the service discovery client, we will reuse that
-   * every time we need to create a new client.
-   */
-  private void initDiscovery() {
-    if (discoveryServiceClient == null) {
-      LOG.info("No DiscoveryServiceClient provided. Skipping service discovery.");
-      return;
-    }
-
-    endpointStrategy = new TimeLimitEndpointStrategy(
-      new RandomEndpointStrategy(
-        discoveryServiceClient.discover(
-          configuration.get(TxConstants.Service.CFG_DATA_TX_DISCOVERY_SERVICE_NAME,
-                            TxConstants.Service.DEFAULT_DATA_TX_DISCOVERY_SERVICE_NAME))),
-      2, TimeUnit.SECONDS);
-  }
-
-  protected TransactionServiceThriftClient newClient() throws TException {
-    return newClient(-1);
-  }
-
-  protected TransactionServiceThriftClient newClient(int timeout) throws TException {
-    initialize();
-    String address;
-    int port;
-
-    if (endpointStrategy == null) {
-      // if there is no discovery service, try to read host and port directly
-      // from the configuration
-      LOG.info("Reading address and port from configuration.");
-      address = configuration.get(TxConstants.Service.CFG_DATA_TX_BIND_ADDRESS,
-                                  TxConstants.Service.DEFAULT_DATA_TX_BIND_ADDRESS);
-      port = configuration.getInt(TxConstants.Service.CFG_DATA_TX_BIND_PORT,
-                                  TxConstants.Service.DEFAULT_DATA_TX_BIND_PORT);
-      LOG.info("Service assumed at " + address + ":" + port);
-    } else {
-      Discoverable endpoint = endpointStrategy.pick();
-      if (endpoint == null) {
-        LOG.error("Unable to discover tx service.");
-        throw new TException("Unable to discover tx service.");
-      }
-      address = endpoint.getSocketAddress().getHostName();
-      port = endpoint.getSocketAddress().getPort();
-      LOG.info("Service discovered at " + address + ":" + port);
-    }
-
-    // now we have an address and port, try to connect a client
-    if (timeout < 0) {
-      timeout = configuration.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_TIMEOUT,
-          TxConstants.Service.DEFAULT_DATA_TX_CLIENT_TIMEOUT_MS);
-    }
-    LOG.info("Attempting to connect to tx service at " +
-               address + ":" + port + " with timeout " + timeout + " ms.");
-    // thrift transport layer
-    TTransport transport =
-        new TFramedTransport(new TSocket(address, port, timeout));
-    try {
-      transport.open();
-    } catch (TTransportException e) {
-      LOG.error("Unable to connect to tx service: " + e.getMessage());
-      throw e;
-    }
-    // and create a thrift client
-    TransactionServiceThriftClient newClient = new TransactionServiceThriftClient(transport);
-
-    LOG.info("Connected to tx service at " +
-               address + ":" + port);
-    return newClient;
-  }
-
-  /**
-   * This class helps picking up an endpoint from a list of Discoverable.
-   */
-  public interface EndpointStrategy {
-
-    /**
-     * Picks a {@link Discoverable} using its strategy.
-     * @return A {@link Discoverable} based on the stragegy or {@code null} if no endpoint can be found.
-     */
-    Discoverable pick();
-  }
-
-  /**
-   * An {@link EndpointStrategy} that make sure it picks an endpoint within the given
-   * timeout limit.
-   */
-  public final class TimeLimitEndpointStrategy implements EndpointStrategy {
-
-    private final EndpointStrategy delegate;
-    private final long timeout;
-    private final TimeUnit timeoutUnit;
-
-    public TimeLimitEndpointStrategy(EndpointStrategy delegate, long timeout, TimeUnit timeoutUnit) {
-      this.delegate = delegate;
-      this.timeout = timeout;
-      this.timeoutUnit = timeoutUnit;
-    }
-
-    @Override
-    public Discoverable pick() {
-      Discoverable pick = delegate.pick();
-      try {
-        long count = 0;
-        while (pick == null && count++ < timeout) {
-          timeoutUnit.sleep(1);
-          pick = delegate.pick();
-        }
-      } catch (InterruptedException e) {
-        // Simply propagate the interrupt.
-        Thread.currentThread().interrupt();
-      }
-      return pick;
-    }
-  }
-
-  /**
-   * Randomly picks endpoint from the list of available endpoints.
-   */
-  public final class RandomEndpointStrategy implements EndpointStrategy {
-
-    private final Iterable<Discoverable> endpoints;
-
-    /**
-     * Constructs a random endpoint strategy.
-     * @param endpoints Endpoints for the strategy to use. Note that this strategy will
-     *                  invoke {@link Iterable#iterator()} and traverse through it on
-     *                  every call to the {@link #pick()} method. One could leverage this
-     *                  behavior with the live {@link Iterable} as provided by
-     *                  {@link org.apache.twill.discovery.DiscoveryServiceClient#discover(String)} method.
-     */
-    public RandomEndpointStrategy(Iterable<Discoverable> endpoints) {
-      this.endpoints = endpoints;
-    }
-
-    @Override
-    public Discoverable pick() {
-      // Reservoir sampling
-      Discoverable result = null;
-      Iterator<Discoverable> itor = endpoints.iterator();
-      Random random = new Random();
-      int count = 0;
-      while (itor.hasNext()) {
-        Discoverable next = itor.next();
-        if (random.nextInt(++count) == 0) {
-          result = next;
-        }
-      }
-      return result;
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/CloseableThriftClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/CloseableThriftClient.java b/tephra-core/src/main/java/co/cask/tephra/distributed/CloseableThriftClient.java
deleted file mode 100644
index ba6da24..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/CloseableThriftClient.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-/**
- * An {@link AutoCloseable} to automatically return the thrift client to the ThriftClientProvider.
- */
-public class CloseableThriftClient implements AutoCloseable {
-
-  private final ThriftClientProvider provider;
-  private final TransactionServiceThriftClient thriftClient;
-
-  public CloseableThriftClient(ThriftClientProvider provider, TransactionServiceThriftClient thriftClient) {
-    this.provider = provider;
-    this.thriftClient = thriftClient;
-  }
-
-  public TransactionServiceThriftClient getThriftClient() {
-    return thriftClient;
-  }
-
-  @Override
-  public void close() {
-    // in any case, the client must be returned to the pool. The pool is
-    // responsible for discarding the client if it is in a bad state.
-    provider.returnClient(thriftClient);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/ElasticPool.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/ElasticPool.java b/tephra-core/src/main/java/co/cask/tephra/distributed/ElasticPool.java
deleted file mode 100644
index 843ab90..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/ElasticPool.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * An Elastic Pool is an object pool that can dynamically grow.
- * Normally, an element is obtained by a client and then returned to the pool
- * after use. However, if the element gets into a bad state, the element can
- * be discarded, based upon the recycle() method returning false. This will
- * cause the element to be removed from the pool, and for a subsequent request,
- * a new element can be created on the fly to replace the discarded one.
- *
- * The pool starts with zero (active) elements. Every time a client attempts
- * to obtain an element, an element from the pool is returned if available.
- * Otherwise, if the number of active elements is less than the pool's limit,
- * a new element is created (using abstract method create(), this must be
- * overridden by all implementations), and the number of active elements is
- * increased by one. If the limit is reached, then obtain() blocks until
- * either an element is returned to the pool or, if the obtain method with timeout
- * parameters is used, a timeout occurs.
- *
- * Every time an element is returned to the pool, it is "recycled" to restore its
- * fresh state for the next use or destroyed, depending on its state.
- *
- * @param <T> the type of the elements
- * @param <E> the type of exception thrown by create()
- */
-public abstract class ElasticPool<T, E extends Exception> {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ElasticPool.class);
-
-  /**
-   * A method to create a new element. Will be called every time the pool
-   * of available elements is empty but the limit of active elements is
-   * not exceeded.
-   * @return a new element
-   */
-  protected abstract T create() throws E;
-
-  /**
-   * A method to recycle an existing element when it is returned to the pool.
-   * This methods ensures that the element is in a fresh state before it can
-   * be reused by the next agent. If the element is not to be returned to the pool,
-   * calling code is responsible for destroying it and returning false.
-   *
-   * @param element the element to recycle
-   * @return true to reuse element, false to remove it from the pool
-   */
-  protected boolean recycle(T element) {
-    // by default, simply return true
-    return true;
-  }
-
-  // holds all currently available elements
-  private final ConcurrentLinkedQueue<T> elements;
-
-  // we keep track of elements via the permits of a semaphore, because there can
-  // be elements in a queue, but also elements that are "loaned out" count towards
-  // the pool's size limit
-  private final Semaphore semaphore;
-
-  public ElasticPool(int sizeLimit) {
-    elements = new ConcurrentLinkedQueue<>();
-    semaphore = new Semaphore(sizeLimit, true);
-  }
-
-  /**
-   * Get a element from the pool. If there is an available element in
-   * the pool, it will be returned. Otherwise, if the number of active
-   * elements does not exceed the limit, a new element is created with
-   * create() and returned. Otherwise, blocks until an element is either
-   * released and returned to the pool, or an element is discarded,
-   * allowing for the creation of a new element.
-   *
-   * @return an element
-   */
-  public T obtain() throws E, InterruptedException {
-    semaphore.acquire();
-    return getOrCreate();
-  }
-
-  /**
-   * Get a element from the pool. If there is an available element in
-   * the pool, it will be returned. Otherwise, if the number of active
-   * elements does ot exceed the limit, a new element is created with
-   * create() and returned. Otherwise, blocks until an element is either
-   * released and returned to the pool, an element is discarded,
-   * allowing for the creation of a new element, or a timeout occurs.
-   *
-   * @param timeout the timeout for trying to obtain an element
-   * @param unit the timeout unit for trying to obtain an element
-   * @return an element
-   * @throws TimeoutException if a client is not able to be obtained within the given timeout
-   */
-  public T obtain(long timeout, TimeUnit unit) throws E, TimeoutException, InterruptedException {
-    if (!semaphore.tryAcquire(1, timeout, unit)) {
-      throw new TimeoutException(String.format("Failed to obtain client within %d %s.",
-                                               timeout, unit));
-    }
-    return getOrCreate();
-  }
-
-  // gets an element from the queue, or creates one if there is none available in the queue.
-  // the semaphore must be acquired before calling this method. The semaphore will be released from within
-  // this method if it throws any exception
-  private T getOrCreate() throws E {
-    try {
-      T client = elements.poll();
-      // a client was available, all good. otherwise, create one
-      if (client != null) {
-        return client;
-      }
-      return create();
-    } catch (Exception e) {
-      // if an exception is thrown after acquiring the semaphore, release the
-      // semaphore before propagating the exception
-      semaphore.release();
-      throw e;
-    }
-  }
-
-  /**
-   * Returns an element to the pool of available elements. The element must
-   * have been obtained from this pool through obtain(). The recycle() method
-   * is called before the element is available for obtain().
-   *
-   * @param element the element to be returned
-   */
-  public void release(T element) {
-    try {
-      if (recycle(element)) {
-        elements.add(element);
-      }
-    } finally {
-      semaphore.release();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/PooledClientProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/PooledClientProvider.java b/tephra-core/src/main/java/co/cask/tephra/distributed/PooledClientProvider.java
deleted file mode 100644
index 3394811..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/PooledClientProvider.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import co.cask.tephra.TxConstants;
-import com.google.common.base.Throwables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.thrift.TException;
-import org.apache.twill.discovery.DiscoveryServiceClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * This is an tx client provider that uses a bounded size pool of connections.
- */
-public class PooledClientProvider extends AbstractClientProvider {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(PooledClientProvider.class);
-
-  // we will use this as a pool of tx clients
-  class TxClientPool extends ElasticPool<TransactionServiceThriftClient, TException> {
-    TxClientPool(int sizeLimit) {
-      super(sizeLimit);
-    }
-
-    @Override
-    protected TransactionServiceThriftClient create() throws TException {
-      return newClient();
-    }
-
-    @Override
-    protected boolean recycle(TransactionServiceThriftClient client) {
-      if (!client.isValid()) {
-        client.close();
-        return false;
-      }
-      return true;
-    }
-  }
-
-  // we will use this as a pool of tx clients
-  private volatile TxClientPool clients;
-
-  // the limit for the number of active clients
-  private int maxClients;
-  // timeout, for obtaining a client
-  private long obtainClientTimeoutMs;
-
-  public PooledClientProvider(Configuration conf, DiscoveryServiceClient discoveryServiceClient) {
-    super(conf, discoveryServiceClient);
-  }
-
-  private void initializePool() throws TException {
-    // initialize the super class (needed for service discovery)
-    super.initialize();
-
-    // create a (empty) pool of tx clients
-    maxClients = configuration.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_COUNT,
-                                      TxConstants.Service.DEFAULT_DATA_TX_CLIENT_COUNT);
-    if (maxClients < 1) {
-      LOG.warn("Configuration of " + TxConstants.Service.CFG_DATA_TX_CLIENT_COUNT +
-                 " is invalid: value is " + maxClients + " but must be at least 1. " +
-                 "Using 1 as a fallback. ");
-      maxClients = 1;
-    }
-
-    obtainClientTimeoutMs =
-      configuration.getLong(TxConstants.Service.CFG_DATA_TX_CLIENT_OBTAIN_TIMEOUT_MS,
-                            TxConstants.Service.DEFAULT_DATA_TX_CLIENT_OBTAIN_TIMEOUT_MS);
-    if (obtainClientTimeoutMs < 0) {
-      LOG.warn("Configuration of " + TxConstants.Service.CFG_DATA_TX_CLIENT_COUNT +
-                 " is invalid: value is " + obtainClientTimeoutMs + " but must be at least 0. " +
-                 "Using 0 as a fallback. ");
-      obtainClientTimeoutMs = 0;
-    }
-    this.clients = new TxClientPool(maxClients);
-  }
-
-  @Override
-  public CloseableThriftClient getCloseableClient() throws TException, TimeoutException, InterruptedException {
-    TransactionServiceThriftClient client = getClientPool().obtain(obtainClientTimeoutMs, TimeUnit.MILLISECONDS);
-    return new CloseableThriftClient(this, client);
-  }
-
-  @Override
-  public void returnClient(TransactionServiceThriftClient client) {
-    getClientPool().release(client);
-  }
-
-  @Override
-  public String toString() {
-    return "Elastic pool of size " + this.maxClients +
-      ", with timeout (in milliseconds): " + this.obtainClientTimeoutMs;
-  }
-
-  private TxClientPool getClientPool() {
-    if (clients != null) {
-      return clients;
-    }
-
-    synchronized (this) {
-      if (clients == null) {
-        try {
-          initializePool();
-        } catch (TException e) {
-          LOG.error("Failed to initialize Tx client provider", e);
-          throw Throwables.propagate(e);
-        }
-      }
-    }
-    return clients;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/RetryNTimes.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryNTimes.java b/tephra-core/src/main/java/co/cask/tephra/distributed/RetryNTimes.java
deleted file mode 100644
index 22a4015..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryNTimes.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import co.cask.tephra.TxConstants;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * A retry strategy that makes N attempts and then gives up. This does
- * not do anything before the re-attempt - extend this class to add a
- * sleep or similar.
- */
-public class RetryNTimes extends RetryStrategy {
-
-  int attempts = 0;
-  int limit;
-
-  /**
-   * @param maxAttempts the number of attempts after which to stop
-   */
-  protected RetryNTimes(int maxAttempts) {
-    limit = maxAttempts;
-  }
-
-  @Override
-  boolean failOnce() {
-    ++attempts;
-    return attempts < limit;
-  }
-
-  /**
-   * A retry strategy provider for this strategy.
-   */
-  public static class Provider implements RetryStrategyProvider {
-
-    int nTimes;
-
-    public Provider() {
-      this.nTimes = TxConstants.Service.DEFAULT_DATA_TX_CLIENT_ATTEMPTS;
-    }
-
-    @Override
-    public void configure(Configuration config) {
-      nTimes = config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_ATTEMPTS, nTimes);
-    }
-
-    @Override
-    public RetryStrategy newRetryStrategy() {
-      return new RetryNTimes(nTimes);
-    }
-
-    @Override
-    public String toString() {
-      return nTimes + " attempts without delay";
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategy.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategy.java b/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategy.java
deleted file mode 100644
index 80ae779..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategy.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-/**
- * A retry strategy is an abstraction over how the remote tx client shuold retry operations after connection
- * failures.
- */
-public abstract class RetryStrategy {
-
-  /**
-   * Increments the number of failed attempts.
-   * @return whether another attempt should be made
-   */
-  abstract boolean failOnce();
-
-  /**
-   * Should be called before re-attempting. This can, for instance
-   * inject a sleep time between retries. Default implementation is
-   * to do nothing.
-   */
-  void beforeRetry() {
-    // do nothinhg
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategyProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategyProvider.java b/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategyProvider.java
deleted file mode 100644
index 3ea2e94..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryStrategyProvider.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * A retry strategy provider is used by the tx client to get a new retry strategy for every call.
- */
-public interface RetryStrategyProvider {
-
-  /**
-   * Provides a new instance of a retry strategy.
-   * @return a retry strategy
-   */
-  RetryStrategy newRetryStrategy();
-
-  /**
-   * Configure the strategy.
-   * @param config the configuration
-   */
-  void configure(Configuration config);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/RetryWithBackoff.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryWithBackoff.java b/tephra-core/src/main/java/co/cask/tephra/distributed/RetryWithBackoff.java
deleted file mode 100644
index 3226ba7..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/RetryWithBackoff.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import co.cask.tephra.TxConstants;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A retry strategy that makes N attempts and then gives up. This does
- * not do anything before the re-attempt - extend this class to add a
- * sleep or similar.
- */
-public class RetryWithBackoff extends RetryStrategy {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(RetryWithBackoff.class);
-
-  int initialSleep; // initial sleep time
-  int backoffFactor; // factor by which to increase sleep for each retry
-  int maxSleep; // max sleep time. stop retrying when we exceed this
-  int sleep; // current sleep time
-
-  /**
-   * @param initial the initial sleep time (before first retry)
-   * @param backoff the backoff factor by which sleep time is multiplied
-   *                after each retry
-   * @param limit the max sleep time. if sleep time reaches this limit, we
-   *              stop retrying
-   */
-  protected RetryWithBackoff(int initial, int backoff, int limit) {
-    initialSleep = initial;
-    backoffFactor = backoff;
-    maxSleep = limit;
-    sleep = initialSleep;
-  }
-
-  @Override
-  boolean failOnce() {
-    return sleep < maxSleep;
-  }
-
-  @Override
-  void beforeRetry() {
-    LOG.info("Sleeping " + sleep + " ms before retry.");
-    long current = System.currentTimeMillis();
-    long end = current + sleep;
-    while (current < end) {
-      try {
-        Thread.sleep(end - current);
-      } catch (InterruptedException e) {
-        // do nothing
-      }
-      current = System.currentTimeMillis();
-    }
-    sleep = sleep * backoffFactor;
-  }
-
-  /**
-   * A provider for this retry strategy.
-   */
-  public static class Provider implements RetryStrategyProvider {
-
-    int initialSleep; // initial sleep time
-    int backoffFactor; // factor by which to increase sleep for each retry
-    int maxSleep; // max sleep time. stop retrying when we exceed this
-
-    public Provider() {
-      initialSleep = TxConstants.Service.DEFAULT_DATA_TX_CLIENT_BACKOFF_INITIAL;
-      backoffFactor = TxConstants.Service.DEFAULT_DATA_TX_CLIENT_BACKOFF_FACTOR;
-      maxSleep = TxConstants.Service.DEFAULT_DATA_TX_CLIENT_BACKOFF_LIMIT;
-    }
-
-    public void configure(Configuration config) {
-      initialSleep = config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_BACKOFF_INITIAL, initialSleep);
-      backoffFactor = config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_BACKOFF_FACTOR, backoffFactor);
-      maxSleep = config.getInt(TxConstants.Service.CFG_DATA_TX_CLIENT_BACKOFF_LIMIT, maxSleep);
-    }
-
-    @Override
-    public RetryStrategy newRetryStrategy() {
-      return new RetryWithBackoff(initialSleep, backoffFactor, maxSleep);
-    }
-
-    @Override
-    public String toString() {
-      return "sleep " + initialSleep + " ms with back off factor " +
-          backoffFactor + " and limit " + maxSleep + " ms";
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/SingleUseClientProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/SingleUseClientProvider.java b/tephra-core/src/main/java/co/cask/tephra/distributed/SingleUseClientProvider.java
deleted file mode 100644
index 8673ca4..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/SingleUseClientProvider.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.thrift.TException;
-import org.apache.twill.discovery.DiscoveryServiceClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeoutException;
-
-/**
- * An tx client provider that creates a new connection every time.
- */
-public class SingleUseClientProvider extends AbstractClientProvider {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(SingleUseClientProvider.class);
-
-  public SingleUseClientProvider(Configuration conf, DiscoveryServiceClient discoveryServiceClient, int timeout) {
-    super(conf, discoveryServiceClient);
-    this.timeout = timeout;
-  }
-
-  final int timeout;
-
-  @Override
-  public CloseableThriftClient getCloseableClient() throws TException, TimeoutException, InterruptedException {
-    try {
-      return new CloseableThriftClient(this, newClient(timeout));
-    } catch (TException e) {
-      LOG.error("Unable to create new tx client: " + e.getMessage());
-      throw e;
-    }
-  }
-
-  @Override
-  public void returnClient(TransactionServiceThriftClient client) {
-    client.close();
-  }
-
-  @Override
-  public String toString() {
-    return "Single-use(timeout = " + timeout + ")";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/ThreadLocalClientProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/ThreadLocalClientProvider.java b/tephra-core/src/main/java/co/cask/tephra/distributed/ThreadLocalClientProvider.java
deleted file mode 100644
index eac75bd..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/ThreadLocalClientProvider.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.thrift.TException;
-import org.apache.twill.discovery.DiscoveryServiceClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeoutException;
-
-/**
- * An tx client provider that uses thread local to maintain at most one open connection per thread.
- * Note that there can be a connection leak if the threads are recycled.
- */
-public class ThreadLocalClientProvider extends AbstractClientProvider {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ThreadLocalClientProvider.class);
-
-  ThreadLocal<TransactionServiceThriftClient> clients = new ThreadLocal<>();
-
-  public ThreadLocalClientProvider(Configuration conf, DiscoveryServiceClient discoveryServiceClient) {
-    super(conf, discoveryServiceClient);
-  }
-
-  @Override
-  public CloseableThriftClient getCloseableClient() throws TException, TimeoutException, InterruptedException {
-    TransactionServiceThriftClient client = this.clients.get();
-    if (client == null) {
-      try {
-        client = this.newClient();
-        clients.set(client);
-      } catch (TException e) {
-        LOG.error("Unable to create new tx client for thread: "
-                    + e.getMessage());
-        throw e;
-      }
-    }
-    return new CloseableThriftClient(this, client);
-  }
-
-  @Override
-  public void returnClient(TransactionServiceThriftClient client) {
-    if (!client.isValid()) {
-      client.close();
-      clients.remove();
-    }
-  }
-
-  @Override
-  public String toString() {
-    return "Thread-local";
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/ThriftClientProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/ThriftClientProvider.java b/tephra-core/src/main/java/co/cask/tephra/distributed/ThriftClientProvider.java
deleted file mode 100644
index 1e86c08..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/ThriftClientProvider.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import org.apache.thrift.TException;
-
-import java.util.concurrent.TimeoutException;
-
-/**
- * This interface is used to provide thrift tx service clients:
- * there is only one (singleton)
- * tx service per JVM, but many threads may use it concurrently.
- * However, being a thrift client, it is not thread-safe. In
- * order to avoid serializing all tx calls by synchronizing
- * on the tx service client, we employ a pool of clients. But
- * in different scenarios there are different strategies for
- * pooling: If there are many short-lived threads, it is wise
- * to have a shared pool between all threads. But if there are
- * few long-lived threads, it may be better to have thread-local
- * client for each thread.
- *
- * This interface provides an abstraction of the pooling strategy.
- */
-public interface ThriftClientProvider {
-
-  /**
-   * Initialize the provider. At this point, it should be verified
-   * that tx service is up and running and getClient() can
-   * create new clients when necessary.
-   */
-  void initialize() throws TException;
-
-  /**
-   * Retrieve an AutoCloseable wrapper around  tx client for exclusive use by the
-   * current thread. The client must be closed (returned) to the provider after use.
-   * @return an tx client, connected and fully functional
-   */
-  CloseableThriftClient getCloseableClient() throws TException,
-    TimeoutException, InterruptedException;
-
-  /**
-   * Release an tx client back to the provider's pool, if the client is valid.
-   * If the client becomes disfunctional, for instance, due to a socket
-   * exception. The provider must make sure to close the client, and it
-   * must remove the client from its arsenal and be prepared to create
-   * a new client subsequently.
-   *
-   * @param client The client to return
-   */
-  void returnClient(TransactionServiceThriftClient client);
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionConverterUtils.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionConverterUtils.java b/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionConverterUtils.java
deleted file mode 100644
index bda39fd..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionConverterUtils.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionType;
-import co.cask.tephra.distributed.thrift.TTransaction;
-import co.cask.tephra.distributed.thrift.TTransactionType;
-import co.cask.tephra.distributed.thrift.TVisibilityLevel;
-import com.google.common.primitives.Longs;
-
-/**
- * Utility methods to convert to thrift and back.
- */
-public final class TransactionConverterUtils {
-  private static final long[] EMPTY_LONG_ARRAY = {};
-
-  public static TTransaction wrap(Transaction tx) {
-    return new TTransaction(tx.getTransactionId(), tx.getReadPointer(),
-                            Longs.asList(tx.getInvalids()), Longs.asList(tx.getInProgress()),
-                            tx.getFirstShortInProgress(), getTTransactionType(tx.getType()),
-                            tx.getWritePointer(), Longs.asList(tx.getCheckpointWritePointers()),
-                            getTVisibilityLevel(tx.getVisibilityLevel()));
-  }
-
-  public static Transaction unwrap(TTransaction thriftTx) {
-    return new Transaction(thriftTx.getReadPointer(), thriftTx.getTransactionId(), thriftTx.getWritePointer(),
-                           thriftTx.getInvalids() == null ? EMPTY_LONG_ARRAY : Longs.toArray(thriftTx.getInvalids()),
-                           thriftTx.getInProgress() == null ? EMPTY_LONG_ARRAY :
-                               Longs.toArray(thriftTx.getInProgress()),
-                           thriftTx.getFirstShort(), getTransactionType(thriftTx.getType()),
-                           thriftTx.getCheckpointWritePointers() == null ? EMPTY_LONG_ARRAY :
-                               Longs.toArray(thriftTx.getCheckpointWritePointers()),
-                           getVisibilityLevel(thriftTx.getVisibilityLevel()));
-  }
-
-  private static TransactionType getTransactionType(TTransactionType tType) {
-    return tType == TTransactionType.SHORT ? TransactionType.SHORT : TransactionType.LONG;
-  }
-
-  private static TTransactionType getTTransactionType(TransactionType type) {
-    return type == TransactionType.SHORT ? TTransactionType.SHORT : TTransactionType.LONG;
-  }
-
-  private static Transaction.VisibilityLevel getVisibilityLevel(TVisibilityLevel tLevel) {
-    // default to SNAPSHOT
-    if (tLevel == null) {
-      return Transaction.VisibilityLevel.SNAPSHOT;
-    }
-
-    switch (tLevel) {
-      case SNAPSHOT:
-        return Transaction.VisibilityLevel.SNAPSHOT;
-      case SNAPSHOT_EXCLUDE_CURRENT:
-        return Transaction.VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
-      case SNAPSHOT_ALL:
-        return Transaction.VisibilityLevel.SNAPSHOT_ALL;
-      default:
-        throw new IllegalArgumentException("Unknown TVisibilityLevel: " + tLevel);
-    }
-  }
-
-  private static TVisibilityLevel getTVisibilityLevel(Transaction.VisibilityLevel level) {
-    switch (level) {
-      case SNAPSHOT:
-        return TVisibilityLevel.SNAPSHOT;
-      case SNAPSHOT_EXCLUDE_CURRENT:
-        return TVisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT;
-      case SNAPSHOT_ALL:
-        return TVisibilityLevel.SNAPSHOT_ALL;
-      default:
-        throw new IllegalArgumentException("Unknown VisibilityLevel: " + level);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionService.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionService.java b/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionService.java
deleted file mode 100644
index 22e986d..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionService.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.distributed.thrift.TTransactionServer;
-import co.cask.tephra.inmemory.InMemoryTransactionService;
-import co.cask.tephra.rpc.ThriftRPCServer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.twill.api.ElectionHandler;
-import org.apache.twill.discovery.DiscoveryService;
-import org.apache.twill.internal.ServiceListenerAdapter;
-import org.apache.twill.internal.zookeeper.LeaderElection;
-import org.apache.twill.zookeeper.ZKClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- *
- */
-public final class TransactionService extends InMemoryTransactionService {
-  private static final Logger LOG = LoggerFactory.getLogger(TransactionService.class);
-  private LeaderElection leaderElection;
-  private final ZKClient zkClient;
-
-  private ThriftRPCServer<TransactionServiceThriftHandler, TTransactionServer> server;
-
-  @Inject
-  public TransactionService(Configuration conf,
-                            ZKClient zkClient,
-                            DiscoveryService discoveryService,
-                            Provider<TransactionManager> txManagerProvider) {
-    super(conf, discoveryService, txManagerProvider);
-    this.zkClient = zkClient;
-  }
-
-  @Override
-  protected InetSocketAddress getAddress() {
-    if (address.equals("0.0.0.0")) {
-      // resolve hostname
-      try {
-        return new InetSocketAddress(InetAddress.getLocalHost().getHostName(), server.getBindAddress().getPort());
-      } catch (UnknownHostException x) {
-        LOG.error("Cannot resolve hostname for 0.0.0.0", x);
-      }
-    }
-    return server.getBindAddress();
-  }
-
-  @Override
-  protected void doStart() {
-    leaderElection = new LeaderElection(zkClient, "/tx.service/leader", new ElectionHandler() {
-      @Override
-      public void leader() {
-        // if the txManager fails, we should stop the server
-        txManager = txManagerProvider.get();
-        txManager.addListener(new ServiceListenerAdapter() {
-          @Override
-          public void failed(State from, Throwable failure) {
-            LOG.error("Transaction manager aborted, stopping transaction service");
-            TransactionService.this.abort(failure);
-          }
-        }, MoreExecutors.sameThreadExecutor());
-
-        server = ThriftRPCServer.builder(TTransactionServer.class)
-          .setHost(address)
-          .setPort(port)
-          .setWorkerThreads(threads)
-          .setMaxReadBufferBytes(maxReadBufferBytes)
-          .setIOThreads(ioThreads)
-          .build(new TransactionServiceThriftHandler(txManager));
-        try {
-          server.startAndWait();
-          doRegister();
-          LOG.info("Transaction Thrift Service started successfully on " + getAddress());
-        } catch (Throwable t) {
-          LOG.info("Transaction Thrift Service didn't start on " + server.getBindAddress());
-          leaderElection.stop();
-          notifyFailed(t);
-        }
-      }
-
-      @Override
-      public void follower() {
-        // First stop the transaction server as un-registering from discovery can block sometimes.
-        // That can lead to multiple transaction servers being active at the same time.
-        if (server != null && server.isRunning()) {
-          server.stopAndWait();
-        }
-        undoRegister();
-      }
-    });
-    leaderElection.start();
-
-    notifyStarted();
-  }
-
-  @VisibleForTesting
-  State thriftRPCServerState() {
-    return server.state();
-  }
-
-  @Override
-  protected void doStop() {
-    internalStop();
-    notifyStopped();
-  }
-
-  protected void abort(Throwable cause) {
-    // try to clear leader status and shutdown RPC
-    internalStop();
-    notifyFailed(cause);
-  }
-
-  protected void internalStop() {
-    if (leaderElection != null) {
-      // NOTE: if was a leader this will cause loosing of leadership which in callback above will
-      //       de-register service in discovery service and stop the service if needed
-      try {
-        Uninterruptibles.getUninterruptibly(leaderElection.stop(), 5, TimeUnit.SECONDS);
-      } catch (TimeoutException te) {
-        LOG.warn("Timed out waiting for leader election cancellation to complete");
-      } catch (ExecutionException e) {
-        LOG.error("Exception when cancelling leader election.", e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceClient.java b/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceClient.java
deleted file mode 100644
index 4f29730..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceClient.java
+++ /dev/null
@@ -1,473 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import co.cask.tephra.InvalidTruncateTimeException;
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionCouldNotTakeSnapshotException;
-import co.cask.tephra.TransactionNotInProgressException;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.runtime.ConfigModule;
-import co.cask.tephra.runtime.DiscoveryModules;
-import co.cask.tephra.runtime.TransactionClientModule;
-import co.cask.tephra.runtime.TransactionModules;
-import co.cask.tephra.runtime.ZKModule;
-import co.cask.tephra.util.ConfigurationFactory;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
-import com.google.inject.Guice;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.thrift.TException;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.InputStream;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Set;
-
-/**
- * A tx service client
- */
-public class TransactionServiceClient implements TransactionSystemClient {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TransactionServiceClient.class);
-
-  // we will use this to provide every call with an tx client
-  private ThriftClientProvider clientProvider;
-
-  // the retry strategy we will use
-  private final RetryStrategyProvider retryStrategyProvider;
-
-  /**
-   * Utility to be used for basic verification of transaction system availability and functioning
-   * @param args arguments list, accepts single option "-v" that makes it to print out more details about started tx
-   * @throws Exception
-   */
-  public static void main(String[] args) throws Exception {
-    if (args.length > 1 || (args.length == 1 && !"-v".equals(args[0]))) {
-      System.out.println("USAGE: TransactionServiceClient [-v]");
-    }
-
-    boolean verbose = false;
-    if (args.length == 1 && "-v".equals(args[0])) {
-      verbose = true;
-    }
-    doMain(verbose, new ConfigurationFactory().get());
-  }
-
-  @VisibleForTesting
-  public static void doMain(boolean verbose, Configuration conf) throws Exception {
-    LOG.info("Starting tx server client test.");
-    Injector injector = Guice.createInjector(
-      new ConfigModule(conf),
-      new ZKModule(),
-      new DiscoveryModules().getDistributedModules(),
-      new TransactionModules().getDistributedModules(),
-      new TransactionClientModule()
-    );
-
-    ZKClientService zkClient = injector.getInstance(ZKClientService.class);
-    zkClient.startAndWait();
-
-    try {
-      TransactionServiceClient client = injector.getInstance(TransactionServiceClient.class);
-      LOG.info("Starting tx...");
-      Transaction tx = client.startShort();
-      if (verbose) {
-        LOG.info("Started tx details: " + tx.toString());
-      } else {
-        LOG.info("Started tx: " + tx.getTransactionId() +
-                   ", readPointer: " + tx.getReadPointer() +
-                   ", invalids: " + tx.getInvalids().length +
-                   ", inProgress: " + tx.getInProgress().length);
-      }
-      LOG.info("Checking if canCommit tx...");
-      boolean canCommit = client.canCommit(tx, Collections.<byte[]>emptyList());
-      LOG.info("canCommit: " + canCommit);
-      if (canCommit) {
-        LOG.info("Committing tx...");
-        boolean committed = client.commit(tx);
-        LOG.info("Committed tx: " + committed);
-        if (!committed) {
-          LOG.info("Aborting tx...");
-          client.abort(tx);
-          LOG.info("Aborted tx...");
-        }
-      } else {
-        LOG.info("Aborting tx...");
-        client.abort(tx);
-        LOG.info("Aborted tx...");
-      }
-    } finally {
-      zkClient.stopAndWait();
-    }
-  }
-
-  /**
-   * Create from a configuration. This will first attempt to find a zookeeper
-   * for service discovery. Otherwise it will look for the port in the
-   * config and use localhost.
-   * @param config a configuration containing the zookeeper properties
-   */
-  @Inject
-  public TransactionServiceClient(Configuration config,
-                                  ThriftClientProvider clientProvider) {
-
-    // initialize the retry logic
-    String retryStrat = config.get(
-        TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY,
-        TxConstants.Service.DEFAULT_DATA_TX_CLIENT_RETRY_STRATEGY);
-    if ("backoff".equals(retryStrat)) {
-      this.retryStrategyProvider = new RetryWithBackoff.Provider();
-    } else if ("n-times".equals(retryStrat)) {
-      this.retryStrategyProvider = new RetryNTimes.Provider();
-    } else {
-      String message = "Unknown Retry Strategy '" + retryStrat + "'.";
-      LOG.error(message);
-      throw new IllegalArgumentException(message);
-    }
-    this.retryStrategyProvider.configure(config);
-    LOG.debug("Retry strategy is " + this.retryStrategyProvider);
-
-    this.clientProvider = clientProvider;
-  }
-
-  /**
-   * This is an abstract class that encapsulates an operation. It provides a
-   * method to attempt the actual operation, and it can throw an operation
-   * exception.
-   * @param <T> The return type of the operation
-   */
-  abstract static class Operation<T> {
-
-    /** the name of the operation. */
-    String name;
-
-    /** constructor with name of operation. */
-    Operation(String name) {
-      this.name = name;
-    }
-
-    /** return the name of the operation. */
-    String getName() {
-      return name;
-    }
-
-    /** execute the operation, given an tx client. */
-    abstract T execute(TransactionServiceThriftClient client)
-        throws Exception;
-  }
-
-  /** see execute(operation, client). */
-  private <T> T execute(Operation<T> operation) throws Exception {
-    return execute(operation, null);
-  }
-
-  /**
-   * This is a generic method implementing the somewhat complex execution
-   * and retry logic for operations, to avoid repetitive code.
-   *
-   * Attempts to execute one operation, by obtaining an tx client from
-   * the client provider and passing the operation to the client. If the
-   * call fails with a Thrift exception, apply the retry strategy. If no
-   * more retries are to be made according to the strategy, call the
-   * operation's error method to obtain a value to return. Note that error()
-   * may also throw an exception. Note also that the retry logic is only
-   * applied for thrift exceptions.
-   *
-   * @param operation The operation to be executed
-   * @param provider An tx client provider. If null, then a client will be
-   *                 obtained using the client provider
-   * @param <T> The return type of the operation
-   * @return the result of the operation, or a value returned by error()
-   */
-  private <T> T execute(Operation<T> operation, ThriftClientProvider provider) throws Exception {
-    RetryStrategy retryStrategy = retryStrategyProvider.newRetryStrategy();
-    while (true) {
-      // did we get a custom client provider or do we use the default?
-      if (provider == null) {
-        provider = this.clientProvider;
-      }
-      // this will throw a TException if it cannot get a client
-      try (CloseableThriftClient closeable = provider.getCloseableClient()) {
-        // note that this can throw exceptions other than TException
-        return operation.execute(closeable.getThriftClient());
-
-      } catch (TException te) {
-        // determine whether we should retry
-        boolean retry = retryStrategy.failOnce();
-        if (!retry) {
-          // retry strategy is exceeded, throw an operation exception
-          String message =
-              "Thrift error for " + operation + ": " + te.getMessage();
-          LOG.error(message);
-          LOG.debug(message, te);
-          throw new Exception(message, te);
-        } else {
-          // call retry strategy before retrying
-          retryStrategy.beforeRetry();
-          String msg = "Retrying " + operation.getName() + " after Thrift error: " + te.getMessage();
-          LOG.info(msg);
-          LOG.debug(msg, te);
-        }
-
-      }
-    }
-  }
-
-  @Override
-  public Transaction startLong() {
-    try {
-      return execute(
-        new Operation<Transaction>("startLong") {
-          @Override
-          public Transaction execute(TransactionServiceThriftClient client)
-            throws TException {
-            return client.startLong();
-          }
-        });
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public Transaction startShort() {
-    try {
-      return execute(
-        new Operation<Transaction>("startShort") {
-          @Override
-          public Transaction execute(TransactionServiceThriftClient client)
-            throws TException {
-            return client.startShort();
-          }
-        });
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public Transaction startShort(final int timeout) {
-    try {
-      return execute(
-        new Operation<Transaction>("startShort") {
-          @Override
-          public Transaction execute(TransactionServiceThriftClient client)
-            throws TException {
-            return client.startShort(timeout);
-          }
-        });
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public boolean canCommit(final Transaction tx, final Collection<byte[]> changeIds)
-    throws TransactionNotInProgressException {
-
-    try {
-      return execute(
-        new Operation<Boolean>("canCommit") {
-          @Override
-          public Boolean execute(TransactionServiceThriftClient client)
-            throws Exception {
-            return client.canCommit(tx, changeIds);
-          }
-        });
-    } catch (TransactionNotInProgressException e) {
-      throw e;
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public boolean commit(final Transaction tx) throws TransactionNotInProgressException {
-    try {
-      return this.execute(
-        new Operation<Boolean>("commit") {
-          @Override
-          public Boolean execute(TransactionServiceThriftClient client)
-            throws Exception {
-            return client.commit(tx);
-          }
-        });
-    } catch (TransactionNotInProgressException e) {
-      throw e;
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public void abort(final Transaction tx) {
-    try {
-      this.execute(
-        new Operation<Boolean>("abort") {
-          @Override
-          public Boolean execute(TransactionServiceThriftClient client)
-            throws TException {
-            client.abort(tx);
-            return true;
-          }
-        });
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public boolean invalidate(final long tx) {
-    try {
-      return this.execute(
-        new Operation<Boolean>("invalidate") {
-          @Override
-          public Boolean execute(TransactionServiceThriftClient client)
-            throws TException {
-            return client.invalidate(tx);
-          }
-        });
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public Transaction checkpoint(final Transaction tx) throws TransactionNotInProgressException {
-    try {
-      return this.execute(
-          new Operation<Transaction>("checkpoint") {
-            @Override
-            Transaction execute(TransactionServiceThriftClient client) throws Exception {
-              return client.checkpoint(tx);
-            }
-          }
-      );
-    } catch (TransactionNotInProgressException e) {
-      throw e;
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public InputStream getSnapshotInputStream() throws TransactionCouldNotTakeSnapshotException {
-    try {
-      return this.execute(
-          new Operation<InputStream>("takeSnapshot") {
-            @Override
-            public InputStream execute(TransactionServiceThriftClient client)
-                throws Exception {
-              return client.getSnapshotStream();
-            }
-          });
-    } catch (TransactionCouldNotTakeSnapshotException e) {
-      throw e;
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public String status() {
-    try {
-      return this.execute(
-        new Operation<String>("status") {
-          @Override
-          public String execute(TransactionServiceThriftClient client) throws Exception {
-            return client.status();
-          }
-        });
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public void resetState() {
-    try {
-      this.execute(
-          new Operation<Boolean>("resetState") {
-            @Override
-            public Boolean execute(TransactionServiceThriftClient client)
-                throws TException {
-              client.resetState();
-              return true;
-            }
-          });
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public boolean truncateInvalidTx(final Set<Long> invalidTxIds) {
-    try {
-      return this.execute(
-        new Operation<Boolean>("truncateInvalidTx") {
-          @Override
-          public Boolean execute(TransactionServiceThriftClient client) throws TException {
-            return client.truncateInvalidTx(invalidTxIds);
-          }
-        });
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public boolean truncateInvalidTxBefore(final long time) throws InvalidTruncateTimeException {
-    try {
-      return this.execute(
-        new Operation<Boolean>("truncateInvalidTxBefore") {
-          @Override
-          public Boolean execute(TransactionServiceThriftClient client) throws Exception {
-            return client.truncateInvalidTxBefore(time);
-          }
-        });
-    } catch (InvalidTruncateTimeException e) {
-      throw e;
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public int getInvalidSize() {
-    try {
-      return this.execute(
-        new Operation<Integer>("getInvalidSize") {
-          @Override
-          public Integer execute(TransactionServiceThriftClient client) throws TException {
-            return client.getInvalidSize();
-          }
-        });
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceThriftClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceThriftClient.java b/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceThriftClient.java
deleted file mode 100644
index 01f9b4c..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/distributed/TransactionServiceThriftClient.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.distributed;
-
-import co.cask.tephra.InvalidTruncateTimeException;
-import co.cask.tephra.Transaction;
-import co.cask.tephra.TransactionCouldNotTakeSnapshotException;
-import co.cask.tephra.TransactionNotInProgressException;
-import co.cask.tephra.distributed.thrift.TInvalidTruncateTimeException;
-import co.cask.tephra.distributed.thrift.TTransactionCouldNotTakeSnapshotException;
-import co.cask.tephra.distributed.thrift.TTransactionNotInProgressException;
-import co.cask.tephra.distributed.thrift.TTransactionServer;
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
-
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * This class is a wrapper around the thrift tx service client, it takes
- * Operations, converts them into thrift objects, calls the thrift
- * client, and converts the results back to data fabric classes.
- * This class also instruments the thrift calls with metrics.
- */
-public class TransactionServiceThriftClient {
-  private static final Function<byte[], ByteBuffer> BYTES_WRAPPER = new Function<byte[], ByteBuffer>() {
-    @Override
-    public ByteBuffer apply(byte[] input) {
-      return ByteBuffer.wrap(input);
-    }
-  };
-
-  /**
-   * The thrift transport layer. We need this when we close the connection.
-   */
-  TTransport transport;
-
-  /**
-   * The actual thrift client.
-   */
-  TTransactionServer.Client client;
-
-  /**
-   * Whether this client is valid for use.
-   */
-  private final AtomicBoolean isValid = new AtomicBoolean(true);
-
-  /**
-   * Constructor from an existing, connected thrift transport.
-   *
-   * @param transport the thrift transport layer. It must already be connected
-   */
-  public TransactionServiceThriftClient(TTransport transport) {
-    this.transport = transport;
-    // thrift protocol layer, we use binary because so does the service
-    TProtocol protocol = new TBinaryProtocol(transport);
-    // and create a thrift client
-    this.client = new TTransactionServer.Client(protocol);
-  }
-
-  /**
-   * close this client. may be called multiple times
-   */
-  public void close() {
-    if (this.transport.isOpen()) {
-      this.transport.close();
-    }
-  }
-
-  public Transaction startLong() throws TException {
-    try {
-      return TransactionConverterUtils.unwrap(client.startLong());
-    } catch (TException e) {
-      isValid.set(false);
-      throw e;
-    }
-  }
-
-  public Transaction startShort() throws TException {
-    try {
-      return TransactionConverterUtils.unwrap(client.startShort());
-    } catch (TException e) {
-      isValid.set(false);
-      throw e;
-    }
-  }
-
-  public Transaction startShort(int timeout) throws TException {
-    try {
-      return TransactionConverterUtils.unwrap(client.startShortTimeout(timeout));
-    } catch (TException e) {
-      isValid.set(false);
-      throw e;
-    }
-  }
-
-  public boolean canCommit(Transaction tx, Collection<byte[]> changeIds)
-    throws TException, TransactionNotInProgressException {
-    try {
-      return client.canCommitTx(TransactionConverterUtils.wrap(tx),
-                                ImmutableSet.copyOf(Iterables.transform(changeIds, BYTES_WRAPPER))).isValue();
-    } catch (TTransactionNotInProgressException e) {
-      throw new TransactionNotInProgressException(e.getMessage());
-    } catch (TException e) {
-      isValid.set(false);
-      throw e;
-    }
-  }
-
-
-
-  public boolean commit(Transaction tx) throws TException, TransactionNotInProgressException {
-    try {
-      return client.commitTx(TransactionConverterUtils.wrap(tx)).isValue();
-    } catch (TTransactionNotInProgressException e) {
-      throw new TransactionNotInProgressException(e.getMessage());
-    } catch (TException e) {
-      isValid.set(false);
-      throw e;
-    }
-  }
-
-  public void abort(Transaction tx) throws TException {
-    try {
-      client.abortTx(TransactionConverterUtils.wrap(tx));
-    } catch (TException e) {
-      isValid.set(false);
-      throw e;
-    }
-  }
-
-  public boolean invalidate(long tx) throws TException {
-    try {
-      return client.invalidateTx(tx);
-    } catch (TException e) {
-      isValid.set(false);
-      throw e;
-    }
-  }
-
-  public Transaction checkpoint(Transaction tx) throws TException {
-    try {
-      return TransactionConverterUtils.unwrap(client.checkpoint(TransactionConverterUtils.wrap(tx)));
-    } catch (TException e) {
-      isValid.set(false);
-      throw e;
-    }
-  }
-
-  public InputStream getSnapshotStream() throws TException, TransactionCouldNotTakeSnapshotException {
-    try {
-      ByteBuffer buffer = client.getSnapshot();
-      if (buffer.hasArray()) {
-        return new ByteArrayInputStream(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
-      }
-
-      // The ByteBuffer is not backed by array. Read the content to a new byte array and return an InputStream of that.
-      byte[] snapshot = new byte[buffer.remaining()];
-      buffer.get(snapshot);
-      return new ByteArrayInputStream(snapshot);
-    } catch (TTransactionCouldNotTakeSnapshotException e) {
-      throw new TransactionCouldNotTakeSnapshotException(e.getMessage());
-    } catch (TException e) {
-      isValid.set(false);
-      throw e;
-    }
-  }
-
-  public String status() throws TException {
-    try {
-      return client.status();
-    } catch (TException e) {
-      isValid.set(false);
-      throw e;
-    }
-  }
-
-  public void resetState() throws TException {
-    try {
-        client.resetState();
-    } catch (TException e) {
-      isValid.set(false);
-      throw e;
-    }
-  }
-
-  public boolean truncateInvalidTx(Set<Long> invalidTxIds) throws TException {
-    try {
-      return client.truncateInvalidTx(invalidTxIds).isValue();
-    } catch (TException e) {
-      isValid.set(false);
-      throw e;
-    }
-  }
-  
-  public boolean truncateInvalidTxBefore(long time) throws TException, InvalidTruncateTimeException {
-    try {
-      return client.truncateInvalidTxBefore(time).isValue();
-    } catch (TInvalidTruncateTimeException e) {
-      throw new InvalidTruncateTimeException(e.getMessage());
-    } catch (TException e) {
-      isValid.set(false);
-      throw e;
-    }
-  }
-  
-  public int getInvalidSize() throws TException {
-    try {
-      return client.invalidTxSize();
-    } catch (TException e) {
-      isValid.set(false);
-      throw e;
-    }
-  }
-
-  public boolean isValid() {
-    return isValid.get();
-  }
-}