You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/11 20:15:42 UTC

[42/56] [abbrv] [partial] incubator-tephra git commit: Rename package to org.apache.tephra

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/rpc/ThriftRPCServer.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/rpc/ThriftRPCServer.java b/tephra-core/src/main/java/co/cask/tephra/rpc/ThriftRPCServer.java
deleted file mode 100644
index 432ab2f..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/rpc/ThriftRPCServer.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.rpc;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.AbstractExecutionThreadService;
-import org.apache.thrift.TProcessor;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadedSelectorServerWithFix;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TNonblockingServerSocket;
-import org.apache.twill.common.Threads;
-import org.apache.twill.internal.utils.Networks;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-/**
- * @param <T> The type of service handler interface.
- * @param <I> The type of the thrift service.
- */
-public final class ThriftRPCServer<T extends RPCServiceHandler, I> extends AbstractExecutionThreadService {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ThriftRPCServer.class);
-
-  private final String name;
-  private final int ioThreads;
-  private final int workerThreads;
-  private final int maxReadBufferBytes;
-  private final T serviceHandler;
-  private final TProcessor processor;
-
-  private InetSocketAddress bindAddress;
-  private ExecutorService executor;
-  private TServer server;
-
-  /**
-   * Creates a {@link Builder} for creating instance of {@link ThriftRPCServer}.
-   * @param serviceType Class of the thrift service.
-   * @param <I> Type of the thrift service.
-   * @return A {@link Builder}.
-   */
-  public static <I> Builder<I> builder(Class<I> serviceType) {
-    return new Builder<I>(serviceType);
-  }
-
-  /**
-   * Builder for creating instance of ThriftRPCServer. By default, the instance created will bind to
-   * random port and with 2 io threads and worker threads equals to min(2, number of cpu cores - 2).
-   */
-  public static final class Builder<I> {
-    private final Class<I> serviceType;
-    private String name;
-    private InetSocketAddress bindAddress = new InetSocketAddress(0);
-    private int ioThreads = 2;
-    private int workerThreads = Runtime.getRuntime().availableProcessors() - 2;
-    // 16Mb
-    private int maxReadBufferBytes = 16 * 1024 * 1024;
-
-    private Builder(Class<I> serviceType) {
-      this.serviceType = serviceType;
-      this.name = serviceType.getSimpleName();
-    }
-
-    public Builder<I> setName(String name) {
-      this.name = name;
-      return this;
-    }
-
-    public Builder<I> setHost(String host) {
-      this.bindAddress = new InetSocketAddress(host, bindAddress.getPort());
-      return this;
-    }
-
-    public Builder<I> setPort(int port) {
-      this.bindAddress = new InetSocketAddress(bindAddress.getHostName(), port);
-      return this;
-    }
-
-    public Builder<I> setIOThreads(int count) {
-      this.ioThreads = count;
-      return this;
-    }
-
-    public Builder<I> setWorkerThreads(int count) {
-      this.workerThreads = count;
-      return this;
-    }
-
-    public Builder<I> setMaxReadBufferBytes(int maxReadBufferBytes) {
-      this.maxReadBufferBytes = maxReadBufferBytes;
-      return this;
-    }
-
-    public <T extends RPCServiceHandler> ThriftRPCServer<T, I> build(T serviceHandler) {
-      return new ThriftRPCServer<T, I>(bindAddress, ioThreads, workerThreads, maxReadBufferBytes,
-                                       serviceHandler, serviceType, name);
-    }
-  }
-
-  /**
-   * Creates a ThriftRPCServer with the given paramters.
-   *
-   * @param bindAddress The socket address for the server to listen on. If {@code null}, it'll be binded to random
-   *                    port on localhost.
-   * @param ioThreads Number of io threads.
-   * @param workerThreads Number of worker threads.
-   * @param serviceHandler Handler for handling client requests.
-   */
-  @SuppressWarnings("unchecked")
-  private ThriftRPCServer(InetSocketAddress bindAddress, int ioThreads,
-                          int workerThreads, int maxReadBufferBytes,
-                          T serviceHandler, Class<I> serviceType, String name) {
-    Preconditions.checkArgument(ioThreads > 0, "IO threads must be > 0.");
-    Preconditions.checkArgument(workerThreads > 0, "Worker threads must be > 0.");
-
-    this.bindAddress = bindAddress;
-    this.ioThreads = ioThreads;
-    this.workerThreads = workerThreads;
-    this.maxReadBufferBytes = maxReadBufferBytes;
-    this.serviceHandler = serviceHandler;
-    this.name = name;
-    this.processor = createProcessor((Class<T>) serviceHandler.getClass(), serviceType);
-  }
-
-  public InetSocketAddress getBindAddress() {
-    return bindAddress;
-  }
-
-  @Override
-  protected void startUp() throws Exception {
-    // Determines the address and port to listen on
-    InetSocketAddress listenOn = bindAddress;
-    if (listenOn == null || listenOn.getPort() <= 0) {
-      int port = Networks.getRandomPort();
-      if (listenOn == null) {
-        listenOn = new InetSocketAddress("localhost", port);
-      } else {
-        listenOn = new InetSocketAddress(listenOn.getAddress(), port);
-      }
-    }
-    bindAddress = listenOn;
-
-    executor = new ThreadPoolExecutor(0, workerThreads,
-                                      60L, TimeUnit.SECONDS,
-                                      new SynchronousQueue<Runnable>(),
-                                      Threads.createDaemonThreadFactory(String.format("%s-rpc-%%d", name)),
-                                      new ThreadPoolExecutor.CallerRunsPolicy());
-    serviceHandler.init();
-
-    TThreadedSelectorServerWithFix.Args args =
-      new TThreadedSelectorServerWithFix.Args(new TNonblockingServerSocket(listenOn))
-        .selectorThreads(ioThreads)
-        .protocolFactory(new TBinaryProtocol.Factory())
-        .transportFactory(new TFramedTransport.Factory())
-        .processor(processor)
-        .executorService(executor);
-
-    // ENG-443 - Set the max read buffer size. This is important as this will
-    // prevent the server from throwing OOME if telnetd to the port
-    // it's running on.
-    args.maxReadBufferBytes = maxReadBufferBytes;
-    server = new TThreadedSelectorServerWithFix(args);
-    LOG.info("Starting RPC server for {}", name);
-  }
-
-  @Override
-  protected void shutDown() throws Exception {
-    serviceHandler.destroy();
-    executor.shutdownNow();
-    LOG.info("RPC server for {} stopped.", name);
-  }
-
-  @Override
-  protected void triggerShutdown() {
-    LOG.info("Request to stop RPC server for {}", name);
-    server.stop();
-  }
-
-  @Override
-  protected void run() throws Exception {
-    LOG.info("Running RPC server for {}", name);
-    server.serve();
-    LOG.info("Done running RPC server for {}", name);
-  }
-
-  @SuppressWarnings("unchecked")
-  private TProcessor createProcessor(final Class<T> handlerType, Class<I> serviceType) {
-    // Pick the Iface inner interface and the Processor class
-    Class<? extends TProcessor> processorType = null;
-    Class<?> ifaceType = null;
-    for (Class<?> clz : serviceType.getDeclaredClasses()) {
-      if (TProcessor.class.isAssignableFrom(clz)) {
-        processorType = (Class<? extends TProcessor>) clz;
-      } else if (clz.isInterface() && "Iface".equals(clz.getSimpleName())) {
-        ifaceType = clz;
-      }
-    }
-
-    Preconditions.checkArgument(processorType != null,
-                                "Missing TProcessor, %s is not a valid thrift service.", serviceType.getName());
-    Preconditions.checkArgument(ifaceType != null,
-                                "Missing Iface, %s is not a valid thrift service.", serviceType.getName());
-
-    // If handler already implements the Iface, simply delegate
-    if (ifaceType.isAssignableFrom(handlerType)) {
-      return createProxyProcessor(handlerType, processorType, ifaceType);
-    }
-
-    throw new IllegalArgumentException("Unsupported handler type.");
-  }
-
-  private TProcessor createProxyProcessor(final Class<T> handlerType,
-                                          Class<? extends TProcessor> processorType, Class<?> ifaceType) {
-
-    try {
-      // Map from Iface method to handlerType method to save reflection lookup
-      ImmutableMap.Builder<Method, Method> builder = ImmutableMap.builder();
-      for (Method method : ifaceType.getMethods()) {
-        Method handlerMethod = handlerType.getMethod(method.getName(), method.getParameterTypes());
-        if (!handlerMethod.isAccessible()) {
-          handlerMethod.setAccessible(true);
-        }
-        builder.put(method, handlerMethod);
-      }
-      final Map<Method, Method> methods = builder.build();
-
-      Object proxy = Proxy.newProxyInstance(ifaceType.getClassLoader(),
-                                            new Class[]{ifaceType}, new InvocationHandler() {
-        @Override
-        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
-          try {
-            return methods.get(method).invoke(serviceHandler, args);
-          } catch (InvocationTargetException e) {
-            if (e.getCause() != null) {
-              throw e.getCause();
-            } else {
-              throw e;
-            }
-          }
-        }
-      });
-
-      return processorType.getConstructor(ifaceType).newInstance(proxy);
-    } catch (Exception e) {
-      throw Throwables.propagate(e);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/rpc/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/rpc/package-info.java b/tephra-core/src/main/java/co/cask/tephra/rpc/package-info.java
deleted file mode 100644
index 2b5bccd..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/rpc/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package contains class for writing RPC server and client in simple manner.
- */
-package co.cask.tephra.rpc;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/ConfigModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/ConfigModule.java b/tephra-core/src/main/java/co/cask/tephra/runtime/ConfigModule.java
deleted file mode 100644
index 92df8cd..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/runtime/ConfigModule.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.runtime;
-
-import com.google.inject.AbstractModule;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Provides Guice bindings for {@link Configuration}.
- */
-public final class ConfigModule extends AbstractModule {
-
-  private final Configuration configuration;
-
-  public ConfigModule(Configuration configuration) {
-    this.configuration = configuration;
-  }
-
-  @Override
-  protected void configure() {
-    bind(Configuration.class).toInstance(configuration);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/DiscoveryModules.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/DiscoveryModules.java b/tephra-core/src/main/java/co/cask/tephra/runtime/DiscoveryModules.java
deleted file mode 100644
index 76cbbdf..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/runtime/DiscoveryModules.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.runtime;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Module;
-import com.google.inject.PrivateModule;
-import com.google.inject.Provides;
-import com.google.inject.Singleton;
-import org.apache.twill.common.Cancellable;
-import org.apache.twill.discovery.Discoverable;
-import org.apache.twill.discovery.DiscoveryService;
-import org.apache.twill.discovery.DiscoveryServiceClient;
-import org.apache.twill.discovery.InMemoryDiscoveryService;
-import org.apache.twill.discovery.ServiceDiscovered;
-import org.apache.twill.discovery.ZKDiscoveryService;
-import org.apache.twill.zookeeper.ZKClientService;
-
-/**
- * Provides access to Google Guice modules for in-memory, single-node, and distributed operation for
- * {@link DiscoveryService} and {@link DiscoveryServiceClient}.
- */
-public final class DiscoveryModules {
-
-  public Module getInMemoryModules() {
-    return new InMemoryDiscoveryModule();
-  }
-
-  public Module getSingleNodeModules() {
-    return new InMemoryDiscoveryModule();
-  }
-
-  public Module getDistributedModules() {
-    return new ZKDiscoveryModule();
-  }
-
-  private static final class InMemoryDiscoveryModule extends AbstractModule {
-
-    // ensuring to be singleton across JVM
-    private static final InMemoryDiscoveryService IN_MEMORY_DISCOVERY_SERVICE = new InMemoryDiscoveryService();
-
-    @Override
-    protected void configure() {
-      InMemoryDiscoveryService discovery = IN_MEMORY_DISCOVERY_SERVICE;
-      bind(DiscoveryService.class).toInstance(discovery);
-      bind(DiscoveryServiceClient.class).toInstance(discovery);
-    }
-  }
-
-  private static final class ZKDiscoveryModule extends PrivateModule {
-
-    @Override
-    protected void configure() {
-      expose(DiscoveryService.class);
-      expose(DiscoveryServiceClient.class);
-    }
-
-    @Provides
-    @Singleton
-    private ZKDiscoveryService providesZKDiscoveryService(ZKClientService zkClient) {
-      return new ZKDiscoveryService(zkClient);
-    }
-
-    @Provides
-    @Singleton
-    private DiscoveryService providesDiscoveryService(final ZKClientService zkClient,
-                                                      final ZKDiscoveryService delegate) {
-      return new DiscoveryService() {
-        @Override
-        public Cancellable register(Discoverable discoverable) {
-          if (!zkClient.isRunning()) {
-            zkClient.startAndWait();
-          }
-          return delegate.register(discoverable);
-        }
-      };
-    }
-
-    @Provides
-    @Singleton
-    private DiscoveryServiceClient providesDiscoveryServiceClient(final ZKClientService zkClient,
-                                                                  final ZKDiscoveryService delegate) {
-      return new DiscoveryServiceClient() {
-        @Override
-        public ServiceDiscovered discover(String s) {
-          if (!zkClient.isRunning()) {
-            zkClient.startAndWait();
-          }
-          return delegate.discover(s);
-        }
-      };
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionClientModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionClientModule.java b/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionClientModule.java
deleted file mode 100644
index c6df2b3..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionClientModule.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.runtime;
-
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.distributed.PooledClientProvider;
-import co.cask.tephra.distributed.ThreadLocalClientProvider;
-import co.cask.tephra.distributed.ThriftClientProvider;
-import com.google.inject.AbstractModule;
-import com.google.inject.Inject;
-import com.google.inject.Provider;
-import com.google.inject.Singleton;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.twill.discovery.DiscoveryServiceClient;
-
-/**
- * Provides Guice binding for {@link co.cask.tephra.distributed.ThriftClientProvider}.
- */
-public class TransactionClientModule extends AbstractModule {
-
-  @Override
-  protected void configure() {
-    bind(ThriftClientProvider.class).toProvider(ThriftClientProviderSupplier.class);
-  }
-
-  /**
-   * Provides implementation of {@link co.cask.tephra.distributed.ThriftClientProvider}
-   * based on configuration.
-   */
-  @Singleton
-  private static final class ThriftClientProviderSupplier implements Provider<ThriftClientProvider> {
-
-    private final Configuration cConf;
-    private DiscoveryServiceClient discoveryServiceClient;
-
-    @Inject
-    ThriftClientProviderSupplier(Configuration cConf) {
-      this.cConf = cConf;
-    }
-
-    @Inject(optional = true)
-    void setDiscoveryServiceClient(DiscoveryServiceClient discoveryServiceClient) {
-      this.discoveryServiceClient = discoveryServiceClient;
-    }
-
-    @Override
-    public ThriftClientProvider get() {
-      // configure the client provider
-      String provider = cConf.get(TxConstants.Service.CFG_DATA_TX_CLIENT_PROVIDER,
-                                  TxConstants.Service.DEFAULT_DATA_TX_CLIENT_PROVIDER);
-      ThriftClientProvider clientProvider;
-      if ("pool".equals(provider)) {
-        clientProvider = new PooledClientProvider(cConf, discoveryServiceClient);
-      } else if ("thread-local".equals(provider)) {
-        clientProvider = new ThreadLocalClientProvider(cConf, discoveryServiceClient);
-      } else {
-        String message = "Unknown Transaction Service Client Provider '" + provider + "'.";
-        throw new IllegalArgumentException(message);
-      }
-      return clientProvider;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionDistributedModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionDistributedModule.java b/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionDistributedModule.java
deleted file mode 100644
index c88f742..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionDistributedModule.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.runtime;
-
-import co.cask.tephra.DefaultTransactionExecutor;
-import co.cask.tephra.TransactionExecutor;
-import co.cask.tephra.TransactionExecutorFactory;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.distributed.TransactionServiceClient;
-import co.cask.tephra.metrics.DefaultMetricsCollector;
-import co.cask.tephra.metrics.MetricsCollector;
-import co.cask.tephra.persist.HDFSTransactionStateStorage;
-import co.cask.tephra.persist.TransactionStateStorage;
-import co.cask.tephra.snapshot.SnapshotCodecProvider;
-import com.google.inject.AbstractModule;
-import com.google.inject.Singleton;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
-import com.google.inject.name.Names;
-
-/**
- * Guice bindings for running in distributed mode on a cluster.
- */
-final class TransactionDistributedModule extends AbstractModule {
-
-  @Override
-  protected void configure() {
-    bind(SnapshotCodecProvider.class).in(Singleton.class);
-    bind(TransactionStateStorage.class).annotatedWith(Names.named("persist"))
-      .to(HDFSTransactionStateStorage.class).in(Singleton.class);
-    bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class).in(Singleton.class);
-
-    bind(TransactionManager.class).in(Singleton.class);
-    bind(TransactionSystemClient.class).to(TransactionServiceClient.class).in(Singleton.class);
-    bind(MetricsCollector.class).to(DefaultMetricsCollector.class).in(Singleton.class);
-
-    install(new FactoryModuleBuilder()
-        .implement(TransactionExecutor.class, DefaultTransactionExecutor.class)
-        .build(TransactionExecutorFactory.class));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionInMemoryModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionInMemoryModule.java b/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionInMemoryModule.java
deleted file mode 100644
index d9780e1..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionInMemoryModule.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.runtime;
-
-import co.cask.tephra.DefaultTransactionExecutor;
-import co.cask.tephra.TransactionExecutor;
-import co.cask.tephra.TransactionExecutorFactory;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.inmemory.InMemoryTxSystemClient;
-import co.cask.tephra.metrics.MetricsCollector;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.persist.NoOpTransactionStateStorage;
-import co.cask.tephra.persist.TransactionStateStorage;
-import co.cask.tephra.snapshot.SnapshotCodecProvider;
-
-import com.google.inject.AbstractModule;
-import com.google.inject.Singleton;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
-
-/**
- * Guice bindings for running completely in-memory (no persistence).  This should only be used for
- * test classes, as the transaction state cannot be recovered in the case of a failure.
- */
-public class TransactionInMemoryModule extends AbstractModule {
-  public TransactionInMemoryModule() {
-  }
-
-  @Override
-  protected void configure() {
-    bind(SnapshotCodecProvider.class).in(Singleton.class);
-    bind(TransactionStateStorage.class).to(NoOpTransactionStateStorage.class).in(Singleton.class);
-    bind(TransactionManager.class).in(Singleton.class);
-    bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Singleton.class);
-    // no metrics output for in-memory
-    bind(MetricsCollector.class).to(TxMetricsCollector.class);
-
-    install(new FactoryModuleBuilder()
-              .implement(TransactionExecutor.class, DefaultTransactionExecutor.class)
-              .build(TransactionExecutorFactory.class));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionLocalModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionLocalModule.java b/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionLocalModule.java
deleted file mode 100644
index 720c84c..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionLocalModule.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.runtime;
-
-import co.cask.tephra.DefaultTransactionExecutor;
-import co.cask.tephra.TransactionExecutor;
-import co.cask.tephra.TransactionExecutorFactory;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionSystemClient;
-import co.cask.tephra.inmemory.InMemoryTxSystemClient;
-import co.cask.tephra.metrics.DefaultMetricsCollector;
-import co.cask.tephra.metrics.MetricsCollector;
-import co.cask.tephra.persist.LocalFileTransactionStateStorage;
-import co.cask.tephra.persist.TransactionStateStorage;
-import co.cask.tephra.snapshot.SnapshotCodecProvider;
-import com.google.inject.AbstractModule;
-import com.google.inject.Singleton;
-import com.google.inject.assistedinject.FactoryModuleBuilder;
-import com.google.inject.name.Names;
-
-/**
- * Guice bindings for running in single-node mode (persistence to local disk and in-memory client).
- */
-final class TransactionLocalModule extends AbstractModule {
-
-  @Override
-  protected void configure() {
-    bind(SnapshotCodecProvider.class).in(Singleton.class);
-    bind(TransactionStateStorage.class).annotatedWith(Names.named("persist"))
-      .to(LocalFileTransactionStateStorage.class).in(Singleton.class);
-    bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class).in(Singleton.class);
-
-    bind(TransactionManager.class).in(Singleton.class);
-    bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Singleton.class);
-    bind(MetricsCollector.class).to(DefaultMetricsCollector.class);
-
-    install(new FactoryModuleBuilder()
-              .implement(TransactionExecutor.class, DefaultTransactionExecutor.class)
-              .build(TransactionExecutorFactory.class));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionModules.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionModules.java b/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionModules.java
deleted file mode 100644
index 551fb3a..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionModules.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.runtime;
-
-import com.google.inject.Module;
-
-/**
- * Provides access to Google Guice modules for in-memory, single-node, and distributed operation.
- */
-public class TransactionModules {
-  public TransactionModules() {
-  }
-
-  public Module getInMemoryModules() {
-    return new TransactionInMemoryModule();
-  }
-
-  public Module getSingleNodeModules() {
-    return new TransactionLocalModule();
-  }
-
-  public Module getDistributedModules() {
-    return new TransactionDistributedModule();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionStateStorageProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionStateStorageProvider.java b/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionStateStorageProvider.java
deleted file mode 100644
index 777354c..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionStateStorageProvider.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package co.cask.tephra.runtime;
-
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.persist.NoOpTransactionStateStorage;
-import co.cask.tephra.persist.TransactionStateStorage;
-import com.google.inject.Inject;
-import com.google.inject.Injector;
-import com.google.inject.Key;
-import com.google.inject.Provider;
-import com.google.inject.Singleton;
-import com.google.inject.name.Names;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * A provider for {@link TransactionStateStorage} that provides different
- * {@link TransactionStateStorage} implementation based on configuration.
- */
-@Singleton
-public final class TransactionStateStorageProvider implements Provider<TransactionStateStorage> {
-
-  private final Configuration cConf;
-  private final Injector injector;
-
-  @Inject
-  TransactionStateStorageProvider(Configuration cConf, Injector injector) {
-    this.cConf = cConf;
-    this.injector = injector;
-  }
-
-  @Override
-  public TransactionStateStorage get() {
-    if (cConf.getBoolean(TxConstants.Manager.CFG_DO_PERSIST, true)) {
-      return injector.getInstance(Key.get(TransactionStateStorage.class, Names.named("persist")));
-    } else {
-      return injector.getInstance(NoOpTransactionStateStorage.class);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/ZKModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/ZKModule.java b/tephra-core/src/main/java/co/cask/tephra/runtime/ZKModule.java
deleted file mode 100644
index ebc1df7..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/runtime/ZKModule.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.runtime;
-
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.zookeeper.TephraZKClientService;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.inject.AbstractModule;
-import com.google.inject.Provides;
-import com.google.inject.Singleton;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.twill.zookeeper.RetryStrategies;
-import org.apache.twill.zookeeper.ZKClient;
-import org.apache.twill.zookeeper.ZKClientService;
-import org.apache.twill.zookeeper.ZKClientServices;
-import org.apache.twill.zookeeper.ZKClients;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Provides Guice binding to {@link ZKClient} and {@link ZKClientService}.
- */
-public class ZKModule extends AbstractModule {
-
-  @Override
-  protected void configure() {
-    /**
-     * ZKClientService is provided by the provider method
-     * {@link #provideZKClientService(org.apache.hadoop.conf.Configuration)}.
-     */
-    bind(ZKClient.class).to(ZKClientService.class);
-  }
-
-  @Provides
-  @Singleton
-  private ZKClientService provideZKClientService(Configuration conf) {
-    String zkStr = conf.get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM);
-    if (zkStr == null) {
-      // Default to HBase one.
-      zkStr = conf.get(TxConstants.HBase.ZOOKEEPER_QUORUM);
-    }
-
-    int timeOut = conf.getInt(TxConstants.HBase.ZK_SESSION_TIMEOUT, TxConstants.HBase.DEFAULT_ZK_SESSION_TIMEOUT);
-    ZKClientService zkClientService = new TephraZKClientService(zkStr, timeOut, null,
-                                                                ArrayListMultimap.<String, byte[]>create());
-    return ZKClientServices.delegate(
-      ZKClients.reWatchOnExpire(
-        ZKClients.retryOnFailure(zkClientService, RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
-        )
-      )
-    );
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryDecoder.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryDecoder.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryDecoder.java
deleted file mode 100644
index 8203494..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryDecoder.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.snapshot;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * An decoder to help read snapshots in binary format.
- */
-public final class BinaryDecoder {
-
-  private final InputStream input;
-
-  /**
-   * @param input Stream to read from.
-   */
-  public BinaryDecoder(InputStream input) {
-    this.input = input;
-  }
-
-  /**
-   * Read one int from the input.
-   * @return the read number
-   * @throws java.io.IOException If there is IO error.
-   * @throws java.io.EOFException If end of file reached.
-   */
-  public int readInt() throws IOException {
-    int val = 0;
-    int shift = 0;
-    int b = readByte();
-    while (b > 0x7f) {
-      val ^= (b & 0x7f) << shift;
-      shift += 7;
-      b = readByte();
-    }
-    val ^= b << shift;
-    return (val >>> 1) ^ -(val & 1);
-  }
-
-  /**
-   * Read one long int from the input.
-   * @return the read number
-   * @throws java.io.IOException If there is IO error.
-   * @throws java.io.EOFException If end of file reached.
-   */
-  public long readLong() throws IOException {
-    long val = 0;
-    int shift = 0;
-    int b = readByte();
-    while (b > 0x7f) {
-      val ^= (long) (b & 0x7f) << shift;
-      shift += 7;
-      b = readByte();
-    }
-    val ^= (long) b << shift;
-    return (val >>> 1) ^ -(val & 1);
-  }
-
-  /**
-   * Read a byte sequence. First read an int to indicate how many bytes to read, then that many bytes.
-   * @return the read bytes as a byte array
-   * @throws java.io.IOException If there is IO error.
-   * @throws java.io.EOFException If end of file reached.
-   */
-  public byte[] readBytes() throws IOException {
-    int toRead = readInt();
-    byte[] bytes = new byte[toRead];
-    while (toRead > 0) {
-      int byteRead = input.read(bytes, bytes.length - toRead, toRead);
-      if (byteRead == -1) {
-        throw new EOFException();
-      }
-      toRead -= byteRead;
-    }
-    return bytes;
-  }
-
-  /**
-   * Reads a single byte value.
-   *
-   * @return The byte value read.
-   * @throws java.io.IOException If there is IO error.
-   * @throws java.io.EOFException If end of file reached.
-   */
-  private int readByte() throws IOException {
-    int b = input.read();
-    if (b == -1) {
-      throw new EOFException();
-    }
-    return b;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryEncoder.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryEncoder.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryEncoder.java
deleted file mode 100644
index bc1bce0..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryEncoder.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.snapshot;
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-/**
- *  An encoder to help encode snapshots in binary format.
- */
-public final class BinaryEncoder {
-
-  private final OutputStream output;
-
-  /**
-   * @param output stream to write to
-   */
-  public BinaryEncoder(OutputStream output) {
-    this.output = output;
-  }
-
-  /**
-   * write a single int value.
-   * @throws java.io.IOException If there is IO error.
-   */
-  public BinaryEncoder writeInt(int i) throws IOException {
-    // Compute the zig-zag value. First double the value and flip the bit if the input is negative.
-    int val = (i << 1) ^ (i >> 31);
-
-    if ((val & ~0x7f) != 0) {
-      output.write(0x80 | val & 0x7f);
-      val >>>= 7;
-      while (val > 0x7f) {
-        output.write(0x80 | val & 0x7f);
-        val >>>= 7;
-      }
-    }
-    output.write(val);
-
-    return this;
-  }
-
-  /**
-   * write a single long int value.
-   * @throws java.io.IOException If there is IO error.
-   */
-  public BinaryEncoder writeLong(long l) throws IOException {
-    // Compute the zig-zag value. First double the value and flip the bit if the input is negative.
-    long val = (l << 1) ^ (l >> 63);
-
-    if ((val & ~0x7f) != 0) {
-      output.write((int) (0x80 | val & 0x7f));
-      val >>>= 7;
-      while (val > 0x7f) {
-        output.write((int) (0x80 | val & 0x7f));
-        val >>>= 7;
-      }
-    }
-    output.write((int) val);
-
-    return this;
-  }
-
-  /**
-   * write a sequence of bytes. First writes the number of bytes as an int, then the bytes themselves.
-   * @throws java.io.IOException If there is IO error.
-   */
-  public BinaryEncoder writeBytes(byte[] bytes) throws IOException {
-    writeLong(bytes.length);
-    output.write(bytes, 0, bytes.length);
-    return this;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/DefaultSnapshotCodec.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/DefaultSnapshotCodec.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/DefaultSnapshotCodec.java
deleted file mode 100644
index d09d79d..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/snapshot/DefaultSnapshotCodec.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.snapshot;
-
-import co.cask.tephra.ChangeId;
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.persist.TransactionSnapshot;
-import co.cask.tephra.persist.TransactionVisibilityState;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.TreeMap;
-
-/**
- * Handles serialization/deserialization of a {@link co.cask.tephra.persist.TransactionSnapshot}
- * and its elements to {@code byte[]}.
- * @deprecated This codec is now deprecated and is replaced by {@link SnapshotCodecV2}.
- */
-@Deprecated
-public class DefaultSnapshotCodec implements SnapshotCodec {
-
-  private static final Logger LOG = LoggerFactory.getLogger(DefaultSnapshotCodec.class);
-
-  @Override
-  public int getVersion() {
-    return 1;
-  }
-
-  @Override
-  public void encode(OutputStream out, TransactionSnapshot snapshot) {
-    try {
-      BinaryEncoder encoder = new BinaryEncoder(out);
-
-      encoder.writeLong(snapshot.getTimestamp());
-      encoder.writeLong(snapshot.getReadPointer());
-      encoder.writeLong(snapshot.getWritePointer());
-      encodeInvalid(encoder, snapshot.getInvalid());
-      encodeInProgress(encoder, snapshot.getInProgress());
-      encodeChangeSets(encoder, snapshot.getCommittingChangeSets());
-      encodeChangeSets(encoder, snapshot.getCommittedChangeSets());
-
-    } catch (IOException e) {
-      LOG.error("Unable to serialize transaction state: ", e);
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public TransactionSnapshot decode(InputStream in) {
-    BinaryDecoder decoder = new BinaryDecoder(in);
-
-    try {
-      TransactionVisibilityState minTxSnapshot = decodeTransactionVisibilityState(in);
-      NavigableMap<Long, Set<ChangeId>> committing = decodeChangeSets(decoder);
-      NavigableMap<Long, Set<ChangeId>> committed = decodeChangeSets(decoder);
-      return new TransactionSnapshot(minTxSnapshot.getTimestamp(), minTxSnapshot.getReadPointer(),
-                                     minTxSnapshot.getWritePointer(), minTxSnapshot.getInvalid(),
-                                     minTxSnapshot.getInProgress(), committing, committed);
-    } catch (IOException e) {
-      LOG.error("Unable to deserialize transaction state: ", e);
-      throw Throwables.propagate(e);
-    }
-  }
-
-  @Override
-  public TransactionVisibilityState decodeTransactionVisibilityState(InputStream in) {
-    BinaryDecoder decoder = new BinaryDecoder(in);
-    try {
-      long timestamp = decoder.readLong();
-      long readPointer = decoder.readLong();
-      long writePointer = decoder.readLong();
-      Collection<Long> invalid = decodeInvalid(decoder);
-      NavigableMap<Long, TransactionManager.InProgressTx> inProgress = decodeInProgress(decoder);
-      return new TransactionSnapshot(timestamp, readPointer, writePointer, invalid, inProgress);
-    } catch (IOException e) {
-      LOG.error("Unable to deserialize transaction state: ", e);
-      throw Throwables.propagate(e);
-    }
-  }
-
-  private void encodeInvalid(BinaryEncoder encoder, Collection<Long> invalid) throws IOException {
-    if (!invalid.isEmpty()) {
-      encoder.writeInt(invalid.size());
-      for (long invalidTx : invalid) {
-        encoder.writeLong(invalidTx);
-      }
-    }
-    encoder.writeInt(0); // zero denotes end of list as per AVRO spec
-  }
-
-  private Collection<Long> decodeInvalid(BinaryDecoder decoder) throws IOException {
-    int size = decoder.readInt();
-    Collection<Long> invalid = Lists.newArrayListWithCapacity(size);
-    while (size != 0) { // zero denotes end of list as per AVRO spec
-      for (int remaining = size; remaining > 0; --remaining) {
-        invalid.add(decoder.readLong());
-      }
-      size = decoder.readInt();
-    }
-    return invalid;
-  }
-
-  protected void encodeInProgress(BinaryEncoder encoder, Map<Long, TransactionManager.InProgressTx> inProgress)
-    throws IOException {
-
-    if (!inProgress.isEmpty()) {
-      encoder.writeInt(inProgress.size());
-      for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) {
-        encoder.writeLong(entry.getKey()); // tx id
-        encoder.writeLong(entry.getValue().getExpiration());
-        encoder.writeLong(entry.getValue().getVisibilityUpperBound());
-      }
-    }
-    encoder.writeInt(0); // zero denotes end of list as per AVRO spec
-  }
-
-  protected NavigableMap<Long, TransactionManager.InProgressTx> decodeInProgress(BinaryDecoder decoder)
-    throws IOException {
-
-    int size = decoder.readInt();
-    NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap();
-    while (size != 0) { // zero denotes end of list as per AVRO spec
-      for (int remaining = size; remaining > 0; --remaining) {
-        long txId = decoder.readLong();
-        long expiration = decoder.readLong();
-        long visibilityUpperBound = decoder.readLong();
-        inProgress.put(txId,
-                       new TransactionManager.InProgressTx(visibilityUpperBound, expiration));
-      }
-      size = decoder.readInt();
-    }
-    return inProgress;
-  }
-
-  private void encodeChangeSets(BinaryEncoder encoder, Map<Long, Set<ChangeId>> changes) throws IOException {
-    if (!changes.isEmpty()) {
-      encoder.writeInt(changes.size());
-      for (Map.Entry<Long, Set<ChangeId>> entry : changes.entrySet()) {
-        encoder.writeLong(entry.getKey());
-        encodeChanges(encoder, entry.getValue());
-      }
-    }
-    encoder.writeInt(0); // zero denotes end of list as per AVRO spec
-  }
-
-  private NavigableMap<Long, Set<ChangeId>> decodeChangeSets(BinaryDecoder decoder) throws IOException {
-    int size = decoder.readInt();
-    NavigableMap<Long, Set<ChangeId>> changeSets = new TreeMap<Long, Set<ChangeId>>();
-    while (size != 0) { // zero denotes end of list as per AVRO spec
-      for (int remaining = size; remaining > 0; --remaining) {
-        changeSets.put(decoder.readLong(), decodeChanges(decoder));
-      }
-      size = decoder.readInt();
-    }
-    return changeSets;
-  }
-
-  private void encodeChanges(BinaryEncoder encoder, Set<ChangeId> changes) throws IOException {
-    if (!changes.isEmpty()) {
-      encoder.writeInt(changes.size());
-      for (ChangeId change : changes) {
-        encoder.writeBytes(change.getKey());
-      }
-    }
-    encoder.writeInt(0); // zero denotes end of list as per AVRO spec
-  }
-
-  private Set<ChangeId> decodeChanges(BinaryDecoder decoder) throws IOException {
-    int size = decoder.readInt();
-    HashSet<ChangeId> changes = Sets.newHashSetWithExpectedSize(size);
-    while (size != 0) { // zero denotes end of list as per AVRO spec
-      for (int remaining = size; remaining > 0; --remaining) {
-        changes.add(new ChangeId(decoder.readBytes()));
-      }
-      size = decoder.readInt();
-    }
-    // todo is there an immutable hash set?
-    return changes;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodec.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodec.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodec.java
deleted file mode 100644
index 8e9624e..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodec.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.snapshot;
-
-import co.cask.tephra.persist.TransactionSnapshot;
-import co.cask.tephra.persist.TransactionVisibilityState;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * Interface to decode and encode a transaction snapshot. Each codec implements one version of the encoding.
- * It need not include the version when encoding the snapshot.
- */
-public interface SnapshotCodec {
-
-  /**
-   * @return the version of the encoding implemented by the codec.
-   */
-  int getVersion();
-
-  /**
-   * Encode a transaction snapshot into an output stream.
-   * @param out the output stream to write to
-   * @param snapshot the snapshot to encode
-   */
-  void encode(OutputStream out, TransactionSnapshot snapshot);
-
-  /**
-   * Decode a transaction snapshot from an input stream.
-   * @param in the input stream to read from
-   * @return the decoded snapshot
-   */
-  TransactionSnapshot decode(InputStream in);
-
-  /**
-   * Decode transaction visibility state from an input stream.
-   * @param in the input stream to read from
-   * @return {@link TransactionVisibilityState}
-   */
-  TransactionVisibilityState decodeTransactionVisibilityState(InputStream in);
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecProvider.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecProvider.java
deleted file mode 100644
index 28d829c..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecProvider.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.snapshot;
-
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.persist.TransactionSnapshot;
-import co.cask.tephra.persist.TransactionVisibilityState;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.inject.Inject;
-import org.apache.hadoop.conf.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.List;
-import java.util.SortedMap;
-import javax.annotation.Nonnull;
-
-/**
- * Maintains the codecs for all known versions of the transaction snapshot encoding.
- */
-public class SnapshotCodecProvider implements SnapshotCodec {
-
-  private static final Logger LOG = LoggerFactory.getLogger(SnapshotCodecProvider.class);
-
-  private final SortedMap<Integer, SnapshotCodec> codecs = Maps.newTreeMap();
-
-  @Inject
-  public SnapshotCodecProvider(Configuration configuration) {
-    initialize(configuration);
-  }
-
-  /**
-   * Register all codec specified in the configuration with this provider.
-   * There can only be one codec for a given version.
-   */
-  private void initialize(Configuration configuration) {
-    String[] codecClassNames = configuration.getTrimmedStrings(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES);
-    List<Class> codecClasses = Lists.newArrayList();
-    if (codecClassNames != null) {
-      for (String clsName : codecClassNames) {
-        try {
-          codecClasses.add(Class.forName(clsName));
-        } catch (ClassNotFoundException cnfe) {
-          LOG.warn("Unable to load class configured for " + TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES
-              + ": " + clsName, cnfe);
-        }
-      }
-    }
-
-    if (codecClasses.size() == 0) {
-      codecClasses.addAll(Arrays.asList(TxConstants.Persist.DEFAULT_TX_SNAPHOT_CODEC_CLASSES));
-    }
-    for (Class<?> codecClass : codecClasses) {
-      try {
-        SnapshotCodec codec = (SnapshotCodec) (codecClass.newInstance());
-        codecs.put(codec.getVersion(), codec);
-        LOG.debug("Using snapshot codec {} for snapshots of version {}", codecClass.getName(), codec.getVersion());
-      } catch (Exception e) {
-        LOG.warn("Error instantiating snapshot codec {}. Skipping.", codecClass.getName(), e);
-      }
-    }
-  }
-
-  /**
-   * Retrieve the codec for a particular version of the encoding.
-   * @param version the version of interest
-   * @return the corresponding codec
-   * @throws java.lang.IllegalArgumentException if the version is not known
-   */
-  @Nonnull
-  @VisibleForTesting
-  SnapshotCodec getCodecForVersion(int version) {
-    SnapshotCodec codec = codecs.get(version);
-    if (codec == null) {
-      throw new IllegalArgumentException(String.format("Version %d of snapshot encoding is not supported", version));
-    }
-    return codec;
-  }
-
-  /**
-   * Retrieve the current snapshot codec, that is, the codec with the highest known version.
-   * @return the current codec
-   * @throws java.lang.IllegalStateException if no codecs are registered
-   */
-  private SnapshotCodec getCurrentCodec() {
-    if (codecs.isEmpty()) {
-      throw new IllegalStateException(String.format("No codecs are registered."));
-    }
-    return codecs.get(codecs.lastKey());
-  }
-
-  // Return the appropriate codec for the version in InputStream
-  private SnapshotCodec getCodec(InputStream in) {
-    BinaryDecoder decoder = new BinaryDecoder(in);
-    int persistedVersion;
-    try {
-      persistedVersion = decoder.readInt();
-    } catch (IOException e) {
-      LOG.error("Unable to read transaction state version: ", e);
-      throw Throwables.propagate(e);
-    }
-    return getCodecForVersion(persistedVersion);
-  }
-
-  @Override
-  public int getVersion() {
-    return getCurrentCodec().getVersion();
-  }
-
-  @Override
-  public TransactionSnapshot decode(InputStream in) {
-    return getCodec(in).decode(in);
-  }
-
-  @Override
-  public TransactionVisibilityState decodeTransactionVisibilityState(InputStream in) {
-    return getCodec(in).decodeTransactionVisibilityState(in);
-  }
-
-  @Override
-  public void encode(OutputStream out, TransactionSnapshot snapshot) {
-    SnapshotCodec codec = getCurrentCodec();
-    try {
-      new BinaryEncoder(out).writeInt(codec.getVersion());
-    } catch (IOException e) {
-      LOG.error("Unable to write transaction state version: ", e);
-      throw Throwables.propagate(e);
-    }
-    codec.encode(out, snapshot);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV2.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV2.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV2.java
deleted file mode 100644
index 5d07ba5..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV2.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.snapshot;
-
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionType;
-import com.google.common.collect.Maps;
-import it.unimi.dsi.fastutil.longs.LongArrayList;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.NavigableMap;
-
-/**
- * Handles serialization/deserialization of a {@link co.cask.tephra.persist.TransactionSnapshot}
- * and its elements to {@code byte[]}.
- */
-public class SnapshotCodecV2 extends DefaultSnapshotCodec {
-  @Override
-  public int getVersion() {
-    return 2;
-  }
-
-  @Override
-  protected void encodeInProgress(BinaryEncoder encoder, Map<Long, TransactionManager.InProgressTx> inProgress)
-    throws IOException {
-
-    if (!inProgress.isEmpty()) {
-      encoder.writeInt(inProgress.size());
-      for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) {
-        encoder.writeLong(entry.getKey()); // tx id
-        encoder.writeLong(entry.getValue().getExpiration());
-        encoder.writeLong(entry.getValue().getVisibilityUpperBound());
-        encoder.writeInt(entry.getValue().getType().ordinal());
-      }
-    }
-    encoder.writeInt(0); // zero denotes end of list as per AVRO spec
-  }
-
-  @Override
-  protected NavigableMap<Long, TransactionManager.InProgressTx> decodeInProgress(BinaryDecoder decoder)
-    throws IOException {
-
-    int size = decoder.readInt();
-    NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap();
-    while (size != 0) { // zero denotes end of list as per AVRO spec
-      for (int remaining = size; remaining > 0; --remaining) {
-        long txId = decoder.readLong();
-        long expiration = decoder.readLong();
-        long visibilityUpperBound = decoder.readLong();
-        int txTypeIdx = decoder.readInt();
-        TransactionType txType;
-        try {
-          txType = TransactionType.values()[txTypeIdx];
-        } catch (ArrayIndexOutOfBoundsException e) {
-          throw new IOException("Type enum ordinal value is out of range: " + txTypeIdx);
-        }
-        inProgress.put(txId,
-                       new TransactionManager.InProgressTx(visibilityUpperBound, expiration, txType,
-                           new LongArrayList()));
-      }
-      size = decoder.readInt();
-    }
-    return inProgress;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV3.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV3.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV3.java
deleted file mode 100644
index b11821a..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV3.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.snapshot;
-
-/**
- * Handles serialization/deserialization of a {@link co.cask.tephra.persist.TransactionSnapshot}
- * and its elements to {@code byte[]}.
- *
- * <p>The serialization/deserialization of this codec is the same as that performed by {@link SnapshotCodecV2},
- * but a new version number is used to allow easy migration from projects using deprecated codecs with
- * conflicting version numbers.</p>
- */
-public class SnapshotCodecV3 extends SnapshotCodecV2 {
-  @Override
-  public int getVersion() {
-    return 3;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV4.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV4.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV4.java
deleted file mode 100644
index 41d30f2..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV4.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.snapshot;
-
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TransactionType;
-import com.google.common.collect.Maps;
-import it.unimi.dsi.fastutil.longs.LongArrayList;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.NavigableMap;
-
-/**
- * Handles serialization/deserialization of a {@link co.cask.tephra.persist.TransactionSnapshot}
- * and its elements to {@code byte[]}.
- *
- */
-public class SnapshotCodecV4 extends SnapshotCodecV2 {
-  @Override
-  public int getVersion() {
-    return 4;
-  }
-
-  @Override
-  protected void encodeInProgress(BinaryEncoder encoder, Map<Long, TransactionManager.InProgressTx> inProgress)
-      throws IOException {
-
-    if (!inProgress.isEmpty()) {
-      encoder.writeInt(inProgress.size());
-      for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) {
-        encoder.writeLong(entry.getKey()); // tx id
-        encoder.writeLong(entry.getValue().getExpiration());
-        encoder.writeLong(entry.getValue().getVisibilityUpperBound());
-        encoder.writeInt(entry.getValue().getType().ordinal());
-        // write checkpoint tx IDs
-        LongArrayList checkpointPointers = entry.getValue().getCheckpointWritePointers();
-        if (checkpointPointers != null && !checkpointPointers.isEmpty()) {
-          encoder.writeInt(checkpointPointers.size());
-          for (int i = 0; i < checkpointPointers.size(); i++) {
-            encoder.writeLong(checkpointPointers.getLong(i));
-          }
-        }
-        encoder.writeInt(0);
-      }
-    }
-    encoder.writeInt(0); // zero denotes end of list as per AVRO spec
-  }
-
-  @Override
-  protected NavigableMap<Long, TransactionManager.InProgressTx> decodeInProgress(BinaryDecoder decoder)
-      throws IOException {
-
-    int size = decoder.readInt();
-    NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap();
-    while (size != 0) { // zero denotes end of list as per AVRO spec
-      for (int remaining = size; remaining > 0; --remaining) {
-        long txId = decoder.readLong();
-        long expiration = decoder.readLong();
-        long visibilityUpperBound = decoder.readLong();
-        int txTypeIdx = decoder.readInt();
-        TransactionType txType;
-        try {
-          txType = TransactionType.values()[txTypeIdx];
-        } catch (ArrayIndexOutOfBoundsException e) {
-          throw new IOException("Type enum ordinal value is out of range: " + txTypeIdx);
-        }
-        // read checkpoint tx IDs
-        int checkpointPointerSize = decoder.readInt();
-        LongArrayList checkpointPointers = new LongArrayList(checkpointPointerSize);
-        while (checkpointPointerSize != 0) {
-          for (int checkpointRemaining = checkpointPointerSize; checkpointRemaining > 0; --checkpointRemaining) {
-            checkpointPointers.add(decoder.readLong());
-          }
-          checkpointPointerSize = decoder.readInt();
-        }
-        inProgress.put(txId,
-            new TransactionManager.InProgressTx(visibilityUpperBound, expiration, txType, checkpointPointers));
-      }
-      size = decoder.readInt();
-    }
-    return inProgress;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/package-info.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/package-info.java
deleted file mode 100644
index e2d044c..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/snapshot/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This package contains interfaces and implementations for encoding and decoding transaction snapshots.
- */
-package co.cask.tephra.snapshot;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationFactory.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationFactory.java b/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationFactory.java
deleted file mode 100644
index e830524..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationFactory.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.util;
-
-import com.google.inject.Provider;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Provides {@code org.apache.hadoop.conf.Configuration} instances, constructed by the correct version used
- * for the runtime.
- */
-public class ConfigurationFactory implements Provider<Configuration> {
-  private static class ConfigurationProviderFactory extends HBaseVersionSpecificFactory<ConfigurationProvider> {
-    @Override
-    protected String getHBase96Classname() {
-      return "co.cask.tephra.hbase96.HBase96ConfigurationProvider";
-    }
-
-    @Override
-    protected String getHBase98Classname() {
-      return "co.cask.tephra.hbase98.HBase98ConfigurationProvider";
-    }
-
-    @Override
-    protected String getHBase10Classname() {
-      return "co.cask.tephra.hbase10.HBase10ConfigurationProvider";
-    }
-
-    @Override
-    protected String getHBase11Classname() {
-      return "co.cask.tephra.hbase11.HBase11ConfigurationProvider";
-    }
-
-    @Override
-    protected String getHBase10CDHClassname() {
-      return "co.cask.tephra.hbase10cdh.HBase10ConfigurationProvider";
-    }
-  }
-
-  private final ConfigurationProvider provider = new ConfigurationProviderFactory().get();
-
-  /**
-   * Returns a new {@link org.apache.hadoop.conf.Configuration} instance from the HBase version-specific factory.
-   */
-  @Override
-  public Configuration get() {
-    return provider.get();
-  }
-
-  /**
-   * Returns a new {@link org.apache.hadoop.conf.Configuration} instance from the HBase version-specific factory.
-   *
-   * @param baseConf additional configuration properties to merge on to the classpath configuration
-   * @return the merged configuration
-   */
-  public Configuration get(Configuration baseConf) {
-    return provider.get(baseConf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationProvider.java b/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationProvider.java
deleted file mode 100644
index e88acf8..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationProvider.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.util;
-
-import com.google.inject.Provider;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * Provides {@code Configuration} instances, constructed by the HBase version on which we are running.
- */
-public abstract class ConfigurationProvider implements Provider<Configuration> {
-  @Override
-  public abstract Configuration get();
-
-  public abstract Configuration get(Configuration baseConf);
-}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersion.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersion.java b/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersion.java
deleted file mode 100644
index 84311c9..0000000
--- a/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersion.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package co.cask.tephra.util;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.Method;
-import java.text.ParseException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Detects the currently loaded HBase version.  It is assumed that only one HBase version is loaded at a time,
- * since using more than one HBase version within the same process will require classloader isolation anyway.
- */
-public class HBaseVersion {
-  private static final String HBASE_94_VERSION = "0.94";
-  private static final String HBASE_96_VERSION = "0.96";
-  private static final String HBASE_98_VERSION = "0.98";
-  private static final String HBASE_10_VERSION = "1.0";
-  private static final String HBASE_11_VERSION = "1.1";
-  private static final String HBASE_12_VERSION = "1.2";
-  private static final String CDH_CLASSIFIER = "cdh";
-
-  private static final Logger LOG = LoggerFactory.getLogger(HBaseVersion.class);
-
-  /**
-   * Represents the major version of the HBase library that is currently loaded.
-   */
-  public enum Version {
-    HBASE_94("0.94"),
-    HBASE_96("0.96"),
-    HBASE_98("0.98"),
-    HBASE_10("1.0"),
-    HBASE_10_CDH("1.0-cdh"),
-    HBASE_11("1.1"),
-    HBASE_12_CDH("1.2-cdh"),
-    UNKNOWN("unknown");
-
-    final String majorVersion;
-
-    Version(String majorVersion) {
-      this.majorVersion = majorVersion;
-    }
-
-    public String getMajorVersion() {
-      return majorVersion;
-    }
-  }
-
-  private static Version currentVersion;
-  private static String versionString;
-  static {
-    try {
-      Class versionInfoClass = Class.forName("org.apache.hadoop.hbase.util.VersionInfo");
-      Method versionMethod = versionInfoClass.getMethod("getVersion");
-      versionString = (String) versionMethod.invoke(null);
-      if (versionString.startsWith(HBASE_94_VERSION)) {
-        currentVersion = Version.HBASE_94;
-      } else if (versionString.startsWith(HBASE_96_VERSION)) {
-        currentVersion = Version.HBASE_96;
-      } else if (versionString.startsWith(HBASE_98_VERSION)) {
-        currentVersion = Version.HBASE_98;
-      } else if (versionString.startsWith(HBASE_10_VERSION)) {
-        VersionNumber ver = VersionNumber.create(versionString);
-        if (ver.getClassifier() != null && ver.getClassifier().startsWith(CDH_CLASSIFIER)) {
-          currentVersion = Version.HBASE_10_CDH;
-        } else {
-          currentVersion = Version.HBASE_10;
-        }
-      } else if (versionString.startsWith(HBASE_11_VERSION)) {
-        currentVersion = Version.HBASE_11;
-      } else if (versionString.startsWith(HBASE_12_VERSION)) {
-        VersionNumber ver = VersionNumber.create(versionString);
-        if (ver.getClassifier() != null && ver.getClassifier().startsWith(CDH_CLASSIFIER)) {
-          currentVersion = Version.HBASE_12_CDH;
-        } else {
-          // CDH 5.7 comes with HBase version 1.2.0-CDH5.7.0. However currently there is no
-          // other hadoop distribution that uses HBase 1.2, so the version is set here to UNKNOWN.
-          currentVersion = Version.UNKNOWN;
-        }
-      } else {
-        currentVersion = Version.UNKNOWN;
-      }
-    } catch (Throwable e) {
-      // must be a class loading exception, HBase is not there
-      LOG.error("Unable to determine HBase version from string '{}', are HBase classes available?", versionString);
-      LOG.error("Exception was: ", e);
-      currentVersion = Version.UNKNOWN;
-    }
-  }
-
-  /**
-   * Returns the major version of the currently loaded HBase library.
-   */
-  public static Version get() {
-    return currentVersion;
-  }
-
-  /**
-   * Returns the full version string for the currently loaded HBase library.
-   */
-  public static String getVersionString() {
-    return versionString;
-  }
-
-  /**
-   * Prints out the HBase {@link Version} enum value for the current version of HBase on the classpath.
-   */
-  public static void main(String[] args) {
-    boolean verbose = args.length == 1 && "-v".equals(args[0]);
-    Version version = HBaseVersion.get();
-    System.out.println(version.getMajorVersion());
-    if (verbose) {
-      System.out.println("versionString=" + getVersionString());
-    }
-  }
-
-  /**
-   * Utility class to parse apart version number components.  The version string provided is expected to be in
-   * the format: major[.minor[.patch[.last]][-classifier][-SNAPSHOT]
-   *
-   * <p>Only the major version number is actually required.</p>
-   */
-  public static class VersionNumber {
-    private static final Pattern PATTERN =
-        Pattern.compile("(\\d+)(\\.(\\d+))?(\\.(\\d+))?(\\.(\\d+))?(\\-(?!SNAPSHOT)([^\\-]+))?(\\-SNAPSHOT)?");
-
-    private Integer major;
-    private Integer minor;
-    private Integer patch;
-    private Integer last;
-    private String classifier;
-    private boolean snapshot;
-
-    private VersionNumber(Integer major, Integer minor, Integer patch, Integer last,
-                          String classifier, boolean snapshot) {
-      this.major = major;
-      this.minor = minor;
-      this.patch = patch;
-      this.last = last;
-      this.classifier = classifier;
-      this.snapshot = snapshot;
-    }
-
-    public Integer getMajor() {
-      return major;
-    }
-
-    public Integer getMinor() {
-      return minor;
-    }
-
-    public Integer getPatch() {
-      return patch;
-    }
-
-    public Integer getLast() {
-      return last;
-    }
-
-    public String getClassifier() {
-      return classifier;
-    }
-
-    public boolean isSnapshot() {
-      return snapshot;
-    }
-
-    public static VersionNumber create(String versionString) throws ParseException {
-      Matcher matcher = PATTERN.matcher(versionString);
-      if (matcher.matches()) {
-        String majorString = matcher.group(1);
-        String minorString = matcher.group(3);
-        String patchString = matcher.group(5);
-        String last = matcher.group(7);
-        String classifier = matcher.group(9);
-        String snapshotString = matcher.group(10);
-        return new VersionNumber(new Integer(majorString),
-            minorString != null ? new Integer(minorString) : null,
-            patchString != null ? new Integer(patchString) : null,
-            last != null ? new Integer(last) : null,
-            classifier,
-            "-SNAPSHOT".equals(snapshotString));
-      }
-      throw new ParseException(
-          "Input string did not match expected pattern: major[.minor[.patch]][-classifier][-SNAPSHOT]", 0);
-    }
-  }
-}