You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tephra.apache.org by po...@apache.org on 2016/05/11 20:15:32 UTC

[32/56] [abbrv] [partial] incubator-tephra git commit: Rename package to org.apache.tephra

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/rpc/ThriftRPCServer.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/rpc/ThriftRPCServer.java b/tephra-core/src/main/java/org/apache/tephra/rpc/ThriftRPCServer.java
new file mode 100644
index 0000000..738d6c8
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/rpc/ThriftRPCServer.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tephra.rpc;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.AbstractExecutionThreadService;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadedSelectorServerWithFix;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.twill.common.Threads;
+import org.apache.twill.internal.utils.Networks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @param <T> The type of service handler interface.
+ * @param <I> The type of the thrift service.
+ */
+public final class ThriftRPCServer<T extends RPCServiceHandler, I> extends AbstractExecutionThreadService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ThriftRPCServer.class);
+
+  private final String name;
+  private final int ioThreads;
+  private final int workerThreads;
+  private final int maxReadBufferBytes;
+  private final T serviceHandler;
+  private final TProcessor processor;
+
+  private InetSocketAddress bindAddress;
+  private ExecutorService executor;
+  private TServer server;
+
+  /**
+   * Creates a {@link Builder} for creating instance of {@link ThriftRPCServer}.
+   * @param serviceType Class of the thrift service.
+   * @param <I> Type of the thrift service.
+   * @return A {@link Builder}.
+   */
+  public static <I> Builder<I> builder(Class<I> serviceType) {
+    return new Builder<I>(serviceType);
+  }
+
+  /**
+   * Builder for creating instance of ThriftRPCServer. By default, the instance created will bind to
+   * random port and with 2 io threads and worker threads equals to min(2, number of cpu cores - 2).
+   */
+  public static final class Builder<I> {
+    private final Class<I> serviceType;
+    private String name;
+    private InetSocketAddress bindAddress = new InetSocketAddress(0);
+    private int ioThreads = 2;
+    private int workerThreads = Runtime.getRuntime().availableProcessors() - 2;
+    // 16Mb
+    private int maxReadBufferBytes = 16 * 1024 * 1024;
+
+    private Builder(Class<I> serviceType) {
+      this.serviceType = serviceType;
+      this.name = serviceType.getSimpleName();
+    }
+
+    public Builder<I> setName(String name) {
+      this.name = name;
+      return this;
+    }
+
+    public Builder<I> setHost(String host) {
+      this.bindAddress = new InetSocketAddress(host, bindAddress.getPort());
+      return this;
+    }
+
+    public Builder<I> setPort(int port) {
+      this.bindAddress = new InetSocketAddress(bindAddress.getHostName(), port);
+      return this;
+    }
+
+    public Builder<I> setIOThreads(int count) {
+      this.ioThreads = count;
+      return this;
+    }
+
+    public Builder<I> setWorkerThreads(int count) {
+      this.workerThreads = count;
+      return this;
+    }
+
+    public Builder<I> setMaxReadBufferBytes(int maxReadBufferBytes) {
+      this.maxReadBufferBytes = maxReadBufferBytes;
+      return this;
+    }
+
+    public <T extends RPCServiceHandler> ThriftRPCServer<T, I> build(T serviceHandler) {
+      return new ThriftRPCServer<T, I>(bindAddress, ioThreads, workerThreads, maxReadBufferBytes,
+                                       serviceHandler, serviceType, name);
+    }
+  }
+
+  /**
+   * Creates a ThriftRPCServer with the given paramters.
+   *
+   * @param bindAddress The socket address for the server to listen on. If {@code null}, it'll be binded to random
+   *                    port on localhost.
+   * @param ioThreads Number of io threads.
+   * @param workerThreads Number of worker threads.
+   * @param serviceHandler Handler for handling client requests.
+   */
+  @SuppressWarnings("unchecked")
+  private ThriftRPCServer(InetSocketAddress bindAddress, int ioThreads,
+                          int workerThreads, int maxReadBufferBytes,
+                          T serviceHandler, Class<I> serviceType, String name) {
+    Preconditions.checkArgument(ioThreads > 0, "IO threads must be > 0.");
+    Preconditions.checkArgument(workerThreads > 0, "Worker threads must be > 0.");
+
+    this.bindAddress = bindAddress;
+    this.ioThreads = ioThreads;
+    this.workerThreads = workerThreads;
+    this.maxReadBufferBytes = maxReadBufferBytes;
+    this.serviceHandler = serviceHandler;
+    this.name = name;
+    this.processor = createProcessor((Class<T>) serviceHandler.getClass(), serviceType);
+  }
+
+  public InetSocketAddress getBindAddress() {
+    return bindAddress;
+  }
+
+  @Override
+  protected void startUp() throws Exception {
+    // Determines the address and port to listen on
+    InetSocketAddress listenOn = bindAddress;
+    if (listenOn == null || listenOn.getPort() <= 0) {
+      int port = Networks.getRandomPort();
+      if (listenOn == null) {
+        listenOn = new InetSocketAddress("localhost", port);
+      } else {
+        listenOn = new InetSocketAddress(listenOn.getAddress(), port);
+      }
+    }
+    bindAddress = listenOn;
+
+    executor = new ThreadPoolExecutor(0, workerThreads,
+                                      60L, TimeUnit.SECONDS,
+                                      new SynchronousQueue<Runnable>(),
+                                      Threads.createDaemonThreadFactory(String.format("%s-rpc-%%d", name)),
+                                      new ThreadPoolExecutor.CallerRunsPolicy());
+    serviceHandler.init();
+
+    TThreadedSelectorServerWithFix.Args args =
+      new TThreadedSelectorServerWithFix.Args(new TNonblockingServerSocket(listenOn))
+        .selectorThreads(ioThreads)
+        .protocolFactory(new TBinaryProtocol.Factory())
+        .transportFactory(new TFramedTransport.Factory())
+        .processor(processor)
+        .executorService(executor);
+
+    // ENG-443 - Set the max read buffer size. This is important as this will
+    // prevent the server from throwing OOME if telnetd to the port
+    // it's running on.
+    args.maxReadBufferBytes = maxReadBufferBytes;
+    server = new TThreadedSelectorServerWithFix(args);
+    LOG.info("Starting RPC server for {}", name);
+  }
+
+  @Override
+  protected void shutDown() throws Exception {
+    serviceHandler.destroy();
+    executor.shutdownNow();
+    LOG.info("RPC server for {} stopped.", name);
+  }
+
+  @Override
+  protected void triggerShutdown() {
+    LOG.info("Request to stop RPC server for {}", name);
+    server.stop();
+  }
+
+  @Override
+  protected void run() throws Exception {
+    LOG.info("Running RPC server for {}", name);
+    server.serve();
+    LOG.info("Done running RPC server for {}", name);
+  }
+
+  @SuppressWarnings("unchecked")
+  private TProcessor createProcessor(final Class<T> handlerType, Class<I> serviceType) {
+    // Pick the Iface inner interface and the Processor class
+    Class<? extends TProcessor> processorType = null;
+    Class<?> ifaceType = null;
+    for (Class<?> clz : serviceType.getDeclaredClasses()) {
+      if (TProcessor.class.isAssignableFrom(clz)) {
+        processorType = (Class<? extends TProcessor>) clz;
+      } else if (clz.isInterface() && "Iface".equals(clz.getSimpleName())) {
+        ifaceType = clz;
+      }
+    }
+
+    Preconditions.checkArgument(processorType != null,
+                                "Missing TProcessor, %s is not a valid thrift service.", serviceType.getName());
+    Preconditions.checkArgument(ifaceType != null,
+                                "Missing Iface, %s is not a valid thrift service.", serviceType.getName());
+
+    // If handler already implements the Iface, simply delegate
+    if (ifaceType.isAssignableFrom(handlerType)) {
+      return createProxyProcessor(handlerType, processorType, ifaceType);
+    }
+
+    throw new IllegalArgumentException("Unsupported handler type.");
+  }
+
+  private TProcessor createProxyProcessor(final Class<T> handlerType,
+                                          Class<? extends TProcessor> processorType, Class<?> ifaceType) {
+
+    try {
+      // Map from Iface method to handlerType method to save reflection lookup
+      ImmutableMap.Builder<Method, Method> builder = ImmutableMap.builder();
+      for (Method method : ifaceType.getMethods()) {
+        Method handlerMethod = handlerType.getMethod(method.getName(), method.getParameterTypes());
+        if (!handlerMethod.isAccessible()) {
+          handlerMethod.setAccessible(true);
+        }
+        builder.put(method, handlerMethod);
+      }
+      final Map<Method, Method> methods = builder.build();
+
+      Object proxy = Proxy.newProxyInstance(ifaceType.getClassLoader(),
+                                            new Class[]{ifaceType}, new InvocationHandler() {
+        @Override
+        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+          try {
+            return methods.get(method).invoke(serviceHandler, args);
+          } catch (InvocationTargetException e) {
+            if (e.getCause() != null) {
+              throw e.getCause();
+            } else {
+              throw e;
+            }
+          }
+        }
+      });
+
+      return processorType.getConstructor(ifaceType).newInstance(proxy);
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/rpc/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/rpc/package-info.java b/tephra-core/src/main/java/org/apache/tephra/rpc/package-info.java
new file mode 100644
index 0000000..96a5fe5
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/rpc/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains class for writing RPC server and client in simple manner.
+ */
+package org.apache.tephra.rpc;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/ConfigModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/ConfigModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/ConfigModule.java
new file mode 100644
index 0000000..b8ae111
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/ConfigModule.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.runtime;
+
+import com.google.inject.AbstractModule;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Provides Guice bindings for {@link Configuration}.
+ */
+public final class ConfigModule extends AbstractModule {
+
+  private final Configuration configuration;
+
+  public ConfigModule(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  protected void configure() {
+    bind(Configuration.class).toInstance(configuration);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/DiscoveryModules.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/DiscoveryModules.java b/tephra-core/src/main/java/org/apache/tephra/runtime/DiscoveryModules.java
new file mode 100644
index 0000000..b4e2d1b
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/DiscoveryModules.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.runtime;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Module;
+import com.google.inject.PrivateModule;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import org.apache.twill.common.Cancellable;
+import org.apache.twill.discovery.Discoverable;
+import org.apache.twill.discovery.DiscoveryService;
+import org.apache.twill.discovery.DiscoveryServiceClient;
+import org.apache.twill.discovery.InMemoryDiscoveryService;
+import org.apache.twill.discovery.ServiceDiscovered;
+import org.apache.twill.discovery.ZKDiscoveryService;
+import org.apache.twill.zookeeper.ZKClientService;
+
+/**
+ * Provides access to Google Guice modules for in-memory, single-node, and distributed operation for
+ * {@link DiscoveryService} and {@link DiscoveryServiceClient}.
+ */
+public final class DiscoveryModules {
+
+  public Module getInMemoryModules() {
+    return new InMemoryDiscoveryModule();
+  }
+
+  public Module getSingleNodeModules() {
+    return new InMemoryDiscoveryModule();
+  }
+
+  public Module getDistributedModules() {
+    return new ZKDiscoveryModule();
+  }
+
+  private static final class InMemoryDiscoveryModule extends AbstractModule {
+
+    // ensuring to be singleton across JVM
+    private static final InMemoryDiscoveryService IN_MEMORY_DISCOVERY_SERVICE = new InMemoryDiscoveryService();
+
+    @Override
+    protected void configure() {
+      InMemoryDiscoveryService discovery = IN_MEMORY_DISCOVERY_SERVICE;
+      bind(DiscoveryService.class).toInstance(discovery);
+      bind(DiscoveryServiceClient.class).toInstance(discovery);
+    }
+  }
+
+  private static final class ZKDiscoveryModule extends PrivateModule {
+
+    @Override
+    protected void configure() {
+      expose(DiscoveryService.class);
+      expose(DiscoveryServiceClient.class);
+    }
+
+    @Provides
+    @Singleton
+    private ZKDiscoveryService providesZKDiscoveryService(ZKClientService zkClient) {
+      return new ZKDiscoveryService(zkClient);
+    }
+
+    @Provides
+    @Singleton
+    private DiscoveryService providesDiscoveryService(final ZKClientService zkClient,
+                                                      final ZKDiscoveryService delegate) {
+      return new DiscoveryService() {
+        @Override
+        public Cancellable register(Discoverable discoverable) {
+          if (!zkClient.isRunning()) {
+            zkClient.startAndWait();
+          }
+          return delegate.register(discoverable);
+        }
+      };
+    }
+
+    @Provides
+    @Singleton
+    private DiscoveryServiceClient providesDiscoveryServiceClient(final ZKClientService zkClient,
+                                                                  final ZKDiscoveryService delegate) {
+      return new DiscoveryServiceClient() {
+        @Override
+        public ServiceDiscovered discover(String s) {
+          if (!zkClient.isRunning()) {
+            zkClient.startAndWait();
+          }
+          return delegate.discover(s);
+        }
+      };
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionClientModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionClientModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionClientModule.java
new file mode 100644
index 0000000..6689d05
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionClientModule.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.runtime;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.distributed.PooledClientProvider;
+import org.apache.tephra.distributed.ThreadLocalClientProvider;
+import org.apache.tephra.distributed.ThriftClientProvider;
+import org.apache.twill.discovery.DiscoveryServiceClient;
+
+/**
+ * Provides Guice binding for {@link ThriftClientProvider}.
+ */
+public class TransactionClientModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    bind(ThriftClientProvider.class).toProvider(ThriftClientProviderSupplier.class);
+  }
+
+  /**
+   * Provides implementation of {@link ThriftClientProvider}
+   * based on configuration.
+   */
+  @Singleton
+  private static final class ThriftClientProviderSupplier implements Provider<ThriftClientProvider> {
+
+    private final Configuration cConf;
+    private DiscoveryServiceClient discoveryServiceClient;
+
+    @Inject
+    ThriftClientProviderSupplier(Configuration cConf) {
+      this.cConf = cConf;
+    }
+
+    @Inject(optional = true)
+    void setDiscoveryServiceClient(DiscoveryServiceClient discoveryServiceClient) {
+      this.discoveryServiceClient = discoveryServiceClient;
+    }
+
+    @Override
+    public ThriftClientProvider get() {
+      // configure the client provider
+      String provider = cConf.get(TxConstants.Service.CFG_DATA_TX_CLIENT_PROVIDER,
+                                  TxConstants.Service.DEFAULT_DATA_TX_CLIENT_PROVIDER);
+      ThriftClientProvider clientProvider;
+      if ("pool".equals(provider)) {
+        clientProvider = new PooledClientProvider(cConf, discoveryServiceClient);
+      } else if ("thread-local".equals(provider)) {
+        clientProvider = new ThreadLocalClientProvider(cConf, discoveryServiceClient);
+      } else {
+        String message = "Unknown Transaction Service Client Provider '" + provider + "'.";
+        throw new IllegalArgumentException(message);
+      }
+      return clientProvider;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java
new file mode 100644
index 0000000..aaf3534
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.runtime;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Singleton;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import com.google.inject.name.Names;
+import org.apache.tephra.DefaultTransactionExecutor;
+import org.apache.tephra.TransactionExecutor;
+import org.apache.tephra.TransactionExecutorFactory;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.distributed.TransactionServiceClient;
+import org.apache.tephra.metrics.DefaultMetricsCollector;
+import org.apache.tephra.metrics.MetricsCollector;
+import org.apache.tephra.persist.HDFSTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+
+/**
+ * Guice bindings for running in distributed mode on a cluster.
+ */
+final class TransactionDistributedModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    bind(SnapshotCodecProvider.class).in(Singleton.class);
+    bind(TransactionStateStorage.class).annotatedWith(Names.named("persist"))
+      .to(HDFSTransactionStateStorage.class).in(Singleton.class);
+    bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class).in(Singleton.class);
+
+    bind(TransactionManager.class).in(Singleton.class);
+    bind(TransactionSystemClient.class).to(TransactionServiceClient.class).in(Singleton.class);
+    bind(MetricsCollector.class).to(DefaultMetricsCollector.class).in(Singleton.class);
+
+    install(new FactoryModuleBuilder()
+        .implement(TransactionExecutor.class, DefaultTransactionExecutor.class)
+        .build(TransactionExecutorFactory.class));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java
new file mode 100644
index 0000000..de7678a
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.runtime;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Singleton;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import org.apache.tephra.DefaultTransactionExecutor;
+import org.apache.tephra.TransactionExecutor;
+import org.apache.tephra.TransactionExecutorFactory;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.metrics.MetricsCollector;
+import org.apache.tephra.metrics.TxMetricsCollector;
+import org.apache.tephra.persist.NoOpTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+
+/**
+ * Guice bindings for running completely in-memory (no persistence).  This should only be used for
+ * test classes, as the transaction state cannot be recovered in the case of a failure.
+ */
+public class TransactionInMemoryModule extends AbstractModule {
+  public TransactionInMemoryModule() {
+  }
+
+  @Override
+  protected void configure() {
+    bind(SnapshotCodecProvider.class).in(Singleton.class);
+    bind(TransactionStateStorage.class).to(NoOpTransactionStateStorage.class).in(Singleton.class);
+    bind(TransactionManager.class).in(Singleton.class);
+    bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Singleton.class);
+    // no metrics output for in-memory
+    bind(MetricsCollector.class).to(TxMetricsCollector.class);
+
+    install(new FactoryModuleBuilder()
+              .implement(TransactionExecutor.class, DefaultTransactionExecutor.class)
+              .build(TransactionExecutorFactory.class));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java
new file mode 100644
index 0000000..7d0b663
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.runtime;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Singleton;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+import com.google.inject.name.Names;
+import org.apache.tephra.DefaultTransactionExecutor;
+import org.apache.tephra.TransactionExecutor;
+import org.apache.tephra.TransactionExecutorFactory;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.inmemory.InMemoryTxSystemClient;
+import org.apache.tephra.metrics.DefaultMetricsCollector;
+import org.apache.tephra.metrics.MetricsCollector;
+import org.apache.tephra.persist.LocalFileTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.snapshot.SnapshotCodecProvider;
+
+/**
+ * Guice bindings for running in single-node mode (persistence to local disk and in-memory client).
+ */
+final class TransactionLocalModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    bind(SnapshotCodecProvider.class).in(Singleton.class);
+    bind(TransactionStateStorage.class).annotatedWith(Names.named("persist"))
+      .to(LocalFileTransactionStateStorage.class).in(Singleton.class);
+    bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class).in(Singleton.class);
+
+    bind(TransactionManager.class).in(Singleton.class);
+    bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Singleton.class);
+    bind(MetricsCollector.class).to(DefaultMetricsCollector.class);
+
+    install(new FactoryModuleBuilder()
+              .implement(TransactionExecutor.class, DefaultTransactionExecutor.class)
+              .build(TransactionExecutorFactory.class));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionModules.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionModules.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionModules.java
new file mode 100644
index 0000000..a3fe1c1
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionModules.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.runtime;
+
+import com.google.inject.Module;
+
+/**
+ * Provides access to Google Guice modules for in-memory, single-node, and distributed operation.
+ */
+public class TransactionModules {
+  public TransactionModules() {
+  }
+
+  public Module getInMemoryModules() {
+    return new TransactionInMemoryModule();
+  }
+
+  public Module getSingleNodeModules() {
+    return new TransactionLocalModule();
+  }
+
+  public Module getDistributedModules() {
+    return new TransactionDistributedModule();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionStateStorageProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionStateStorageProvider.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionStateStorageProvider.java
new file mode 100644
index 0000000..5456553
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionStateStorageProvider.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tephra.runtime;
+
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+import com.google.inject.name.Names;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.persist.NoOpTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+
+/**
+ * A provider for {@link TransactionStateStorage} that provides different
+ * {@link TransactionStateStorage} implementation based on configuration.
+ */
+@Singleton
+public final class TransactionStateStorageProvider implements Provider<TransactionStateStorage> {
+
+  private final Configuration cConf;
+  private final Injector injector;
+
+  @Inject
+  TransactionStateStorageProvider(Configuration cConf, Injector injector) {
+    this.cConf = cConf;
+    this.injector = injector;
+  }
+
+  @Override
+  public TransactionStateStorage get() {
+    if (cConf.getBoolean(TxConstants.Manager.CFG_DO_PERSIST, true)) {
+      return injector.getInstance(Key.get(TransactionStateStorage.class, Names.named("persist")));
+    } else {
+      return injector.getInstance(NoOpTransactionStateStorage.class);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/ZKModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/ZKModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/ZKModule.java
new file mode 100644
index 0000000..da3e019
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/ZKModule.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.runtime;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.zookeeper.TephraZKClientService;
+import org.apache.twill.zookeeper.RetryStrategies;
+import org.apache.twill.zookeeper.ZKClient;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.apache.twill.zookeeper.ZKClientServices;
+import org.apache.twill.zookeeper.ZKClients;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Provides Guice binding to {@link ZKClient} and {@link ZKClientService}.
+ */
+public class ZKModule extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    /**
+     * ZKClientService is provided by the provider method
+     * {@link #provideZKClientService(org.apache.hadoop.conf.Configuration)}.
+     */
+    bind(ZKClient.class).to(ZKClientService.class);
+  }
+
+  @Provides
+  @Singleton
+  private ZKClientService provideZKClientService(Configuration conf) {
+    String zkStr = conf.get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM);
+    if (zkStr == null) {
+      // Default to HBase one.
+      zkStr = conf.get(TxConstants.HBase.ZOOKEEPER_QUORUM);
+    }
+
+    int timeOut = conf.getInt(TxConstants.HBase.ZK_SESSION_TIMEOUT, TxConstants.HBase.DEFAULT_ZK_SESSION_TIMEOUT);
+    ZKClientService zkClientService = new TephraZKClientService(zkStr, timeOut, null,
+                                                                ArrayListMultimap.<String, byte[]>create());
+    return ZKClientServices.delegate(
+      ZKClients.reWatchOnExpire(
+        ZKClients.retryOnFailure(zkClientService, RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS)
+        )
+      )
+    );
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryDecoder.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryDecoder.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryDecoder.java
new file mode 100644
index 0000000..a1cd6dd
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryDecoder.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.snapshot;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * An decoder to help read snapshots in binary format.
+ */
+public final class BinaryDecoder {
+
+  private final InputStream input;
+
+  /**
+   * @param input Stream to read from.
+   */
+  public BinaryDecoder(InputStream input) {
+    this.input = input;
+  }
+
+  /**
+   * Read one int from the input.
+   * @return the read number
+   * @throws java.io.IOException If there is IO error.
+   * @throws java.io.EOFException If end of file reached.
+   */
+  public int readInt() throws IOException {
+    int val = 0;
+    int shift = 0;
+    int b = readByte();
+    while (b > 0x7f) {
+      val ^= (b & 0x7f) << shift;
+      shift += 7;
+      b = readByte();
+    }
+    val ^= b << shift;
+    return (val >>> 1) ^ -(val & 1);
+  }
+
+  /**
+   * Read one long int from the input.
+   * @return the read number
+   * @throws java.io.IOException If there is IO error.
+   * @throws java.io.EOFException If end of file reached.
+   */
+  public long readLong() throws IOException {
+    long val = 0;
+    int shift = 0;
+    int b = readByte();
+    while (b > 0x7f) {
+      val ^= (long) (b & 0x7f) << shift;
+      shift += 7;
+      b = readByte();
+    }
+    val ^= (long) b << shift;
+    return (val >>> 1) ^ -(val & 1);
+  }
+
+  /**
+   * Read a byte sequence. First read an int to indicate how many bytes to read, then that many bytes.
+   * @return the read bytes as a byte array
+   * @throws java.io.IOException If there is IO error.
+   * @throws java.io.EOFException If end of file reached.
+   */
+  public byte[] readBytes() throws IOException {
+    int toRead = readInt();
+    byte[] bytes = new byte[toRead];
+    while (toRead > 0) {
+      int byteRead = input.read(bytes, bytes.length - toRead, toRead);
+      if (byteRead == -1) {
+        throw new EOFException();
+      }
+      toRead -= byteRead;
+    }
+    return bytes;
+  }
+
+  /**
+   * Reads a single byte value.
+   *
+   * @return The byte value read.
+   * @throws java.io.IOException If there is IO error.
+   * @throws java.io.EOFException If end of file reached.
+   */
+  private int readByte() throws IOException {
+    int b = input.read();
+    if (b == -1) {
+      throw new EOFException();
+    }
+    return b;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryEncoder.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryEncoder.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryEncoder.java
new file mode 100644
index 0000000..c4ce0a8
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryEncoder.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.snapshot;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ *  An encoder to help encode snapshots in binary format.
+ */
+public final class BinaryEncoder {
+
+  private final OutputStream output;
+
+  /**
+   * @param output stream to write to
+   */
+  public BinaryEncoder(OutputStream output) {
+    this.output = output;
+  }
+
+  /**
+   * write a single int value.
+   * @throws java.io.IOException If there is IO error.
+   */
+  public BinaryEncoder writeInt(int i) throws IOException {
+    // Compute the zig-zag value. First double the value and flip the bit if the input is negative.
+    int val = (i << 1) ^ (i >> 31);
+
+    if ((val & ~0x7f) != 0) {
+      output.write(0x80 | val & 0x7f);
+      val >>>= 7;
+      while (val > 0x7f) {
+        output.write(0x80 | val & 0x7f);
+        val >>>= 7;
+      }
+    }
+    output.write(val);
+
+    return this;
+  }
+
+  /**
+   * write a single long int value.
+   * @throws java.io.IOException If there is IO error.
+   */
+  public BinaryEncoder writeLong(long l) throws IOException {
+    // Compute the zig-zag value. First double the value and flip the bit if the input is negative.
+    long val = (l << 1) ^ (l >> 63);
+
+    if ((val & ~0x7f) != 0) {
+      output.write((int) (0x80 | val & 0x7f));
+      val >>>= 7;
+      while (val > 0x7f) {
+        output.write((int) (0x80 | val & 0x7f));
+        val >>>= 7;
+      }
+    }
+    output.write((int) val);
+
+    return this;
+  }
+
+  /**
+   * write a sequence of bytes. First writes the number of bytes as an int, then the bytes themselves.
+   * @throws java.io.IOException If there is IO error.
+   */
+  public BinaryEncoder writeBytes(byte[] bytes) throws IOException {
+    writeLong(bytes.length);
+    output.write(bytes, 0, bytes.length);
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/DefaultSnapshotCodec.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/DefaultSnapshotCodec.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/DefaultSnapshotCodec.java
new file mode 100644
index 0000000..4a94c74
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/DefaultSnapshotCodec.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.snapshot;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.tephra.ChangeId;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.persist.TransactionSnapshot;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Handles serialization/deserialization of a {@link TransactionSnapshot}
+ * and its elements to {@code byte[]}.
+ * @deprecated This codec is now deprecated and is replaced by {@link SnapshotCodecV2}.
+ */
+@Deprecated
+public class DefaultSnapshotCodec implements SnapshotCodec {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DefaultSnapshotCodec.class);
+
+  @Override
+  public int getVersion() {
+    return 1;
+  }
+
+  @Override
+  public void encode(OutputStream out, TransactionSnapshot snapshot) {
+    try {
+      BinaryEncoder encoder = new BinaryEncoder(out);
+
+      encoder.writeLong(snapshot.getTimestamp());
+      encoder.writeLong(snapshot.getReadPointer());
+      encoder.writeLong(snapshot.getWritePointer());
+      encodeInvalid(encoder, snapshot.getInvalid());
+      encodeInProgress(encoder, snapshot.getInProgress());
+      encodeChangeSets(encoder, snapshot.getCommittingChangeSets());
+      encodeChangeSets(encoder, snapshot.getCommittedChangeSets());
+
+    } catch (IOException e) {
+      LOG.error("Unable to serialize transaction state: ", e);
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public TransactionSnapshot decode(InputStream in) {
+    BinaryDecoder decoder = new BinaryDecoder(in);
+
+    try {
+      TransactionVisibilityState minTxSnapshot = decodeTransactionVisibilityState(in);
+      NavigableMap<Long, Set<ChangeId>> committing = decodeChangeSets(decoder);
+      NavigableMap<Long, Set<ChangeId>> committed = decodeChangeSets(decoder);
+      return new TransactionSnapshot(minTxSnapshot.getTimestamp(), minTxSnapshot.getReadPointer(),
+                                     minTxSnapshot.getWritePointer(), minTxSnapshot.getInvalid(),
+                                     minTxSnapshot.getInProgress(), committing, committed);
+    } catch (IOException e) {
+      LOG.error("Unable to deserialize transaction state: ", e);
+      throw Throwables.propagate(e);
+    }
+  }
+
+  @Override
+  public TransactionVisibilityState decodeTransactionVisibilityState(InputStream in) {
+    BinaryDecoder decoder = new BinaryDecoder(in);
+    try {
+      long timestamp = decoder.readLong();
+      long readPointer = decoder.readLong();
+      long writePointer = decoder.readLong();
+      Collection<Long> invalid = decodeInvalid(decoder);
+      NavigableMap<Long, TransactionManager.InProgressTx> inProgress = decodeInProgress(decoder);
+      return new TransactionSnapshot(timestamp, readPointer, writePointer, invalid, inProgress);
+    } catch (IOException e) {
+      LOG.error("Unable to deserialize transaction state: ", e);
+      throw Throwables.propagate(e);
+    }
+  }
+
+  private void encodeInvalid(BinaryEncoder encoder, Collection<Long> invalid) throws IOException {
+    if (!invalid.isEmpty()) {
+      encoder.writeInt(invalid.size());
+      for (long invalidTx : invalid) {
+        encoder.writeLong(invalidTx);
+      }
+    }
+    encoder.writeInt(0); // zero denotes end of list as per AVRO spec
+  }
+
+  private Collection<Long> decodeInvalid(BinaryDecoder decoder) throws IOException {
+    int size = decoder.readInt();
+    Collection<Long> invalid = Lists.newArrayListWithCapacity(size);
+    while (size != 0) { // zero denotes end of list as per AVRO spec
+      for (int remaining = size; remaining > 0; --remaining) {
+        invalid.add(decoder.readLong());
+      }
+      size = decoder.readInt();
+    }
+    return invalid;
+  }
+
+  protected void encodeInProgress(BinaryEncoder encoder, Map<Long, TransactionManager.InProgressTx> inProgress)
+    throws IOException {
+
+    if (!inProgress.isEmpty()) {
+      encoder.writeInt(inProgress.size());
+      for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) {
+        encoder.writeLong(entry.getKey()); // tx id
+        encoder.writeLong(entry.getValue().getExpiration());
+        encoder.writeLong(entry.getValue().getVisibilityUpperBound());
+      }
+    }
+    encoder.writeInt(0); // zero denotes end of list as per AVRO spec
+  }
+
+  protected NavigableMap<Long, TransactionManager.InProgressTx> decodeInProgress(BinaryDecoder decoder)
+    throws IOException {
+
+    int size = decoder.readInt();
+    NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap();
+    while (size != 0) { // zero denotes end of list as per AVRO spec
+      for (int remaining = size; remaining > 0; --remaining) {
+        long txId = decoder.readLong();
+        long expiration = decoder.readLong();
+        long visibilityUpperBound = decoder.readLong();
+        inProgress.put(txId,
+                       new TransactionManager.InProgressTx(visibilityUpperBound, expiration));
+      }
+      size = decoder.readInt();
+    }
+    return inProgress;
+  }
+
+  private void encodeChangeSets(BinaryEncoder encoder, Map<Long, Set<ChangeId>> changes) throws IOException {
+    if (!changes.isEmpty()) {
+      encoder.writeInt(changes.size());
+      for (Map.Entry<Long, Set<ChangeId>> entry : changes.entrySet()) {
+        encoder.writeLong(entry.getKey());
+        encodeChanges(encoder, entry.getValue());
+      }
+    }
+    encoder.writeInt(0); // zero denotes end of list as per AVRO spec
+  }
+
+  private NavigableMap<Long, Set<ChangeId>> decodeChangeSets(BinaryDecoder decoder) throws IOException {
+    int size = decoder.readInt();
+    NavigableMap<Long, Set<ChangeId>> changeSets = new TreeMap<Long, Set<ChangeId>>();
+    while (size != 0) { // zero denotes end of list as per AVRO spec
+      for (int remaining = size; remaining > 0; --remaining) {
+        changeSets.put(decoder.readLong(), decodeChanges(decoder));
+      }
+      size = decoder.readInt();
+    }
+    return changeSets;
+  }
+
+  private void encodeChanges(BinaryEncoder encoder, Set<ChangeId> changes) throws IOException {
+    if (!changes.isEmpty()) {
+      encoder.writeInt(changes.size());
+      for (ChangeId change : changes) {
+        encoder.writeBytes(change.getKey());
+      }
+    }
+    encoder.writeInt(0); // zero denotes end of list as per AVRO spec
+  }
+
+  private Set<ChangeId> decodeChanges(BinaryDecoder decoder) throws IOException {
+    int size = decoder.readInt();
+    HashSet<ChangeId> changes = Sets.newHashSetWithExpectedSize(size);
+    while (size != 0) { // zero denotes end of list as per AVRO spec
+      for (int remaining = size; remaining > 0; --remaining) {
+        changes.add(new ChangeId(decoder.readBytes()));
+      }
+      size = decoder.readInt();
+    }
+    // todo is there an immutable hash set?
+    return changes;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodec.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodec.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodec.java
new file mode 100644
index 0000000..e2c5c16
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodec.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.snapshot;
+
+import org.apache.tephra.persist.TransactionSnapshot;
+import org.apache.tephra.persist.TransactionVisibilityState;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Interface to decode and encode a transaction snapshot. Each codec implements one version of the encoding.
+ * It need not include the version when encoding the snapshot.
+ */
+public interface SnapshotCodec {
+
+  /**
+   * @return the version of the encoding implemented by the codec.
+   */
+  int getVersion();
+
+  /**
+   * Encode a transaction snapshot into an output stream.
+   * @param out the output stream to write to
+   * @param snapshot the snapshot to encode
+   */
+  void encode(OutputStream out, TransactionSnapshot snapshot);
+
+  /**
+   * Decode a transaction snapshot from an input stream.
+   * @param in the input stream to read from
+   * @return the decoded snapshot
+   */
+  TransactionSnapshot decode(InputStream in);
+
+  /**
+   * Decode transaction visibility state from an input stream.
+   * @param in the input stream to read from
+   * @return {@link TransactionVisibilityState}
+   */
+  TransactionVisibilityState decodeTransactionVisibilityState(InputStream in);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecProvider.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecProvider.java
new file mode 100644
index 0000000..3756846
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecProvider.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.snapshot;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.inject.Inject;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.persist.TransactionSnapshot;
+import org.apache.tephra.persist.TransactionVisibilityState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.SortedMap;
+import javax.annotation.Nonnull;
+
+/**
+ * Maintains the codecs for all known versions of the transaction snapshot encoding.
+ */
+public class SnapshotCodecProvider implements SnapshotCodec {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SnapshotCodecProvider.class);
+
+  private final SortedMap<Integer, SnapshotCodec> codecs = Maps.newTreeMap();
+
+  @Inject
+  public SnapshotCodecProvider(Configuration configuration) {
+    initialize(configuration);
+  }
+
+  /**
+   * Register all codec specified in the configuration with this provider.
+   * There can only be one codec for a given version.
+   */
+  private void initialize(Configuration configuration) {
+    String[] codecClassNames = configuration.getTrimmedStrings(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES);
+    List<Class> codecClasses = Lists.newArrayList();
+    if (codecClassNames != null) {
+      for (String clsName : codecClassNames) {
+        try {
+          codecClasses.add(Class.forName(clsName));
+        } catch (ClassNotFoundException cnfe) {
+          LOG.warn("Unable to load class configured for " + TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES
+              + ": " + clsName, cnfe);
+        }
+      }
+    }
+
+    if (codecClasses.size() == 0) {
+      codecClasses.addAll(Arrays.asList(TxConstants.Persist.DEFAULT_TX_SNAPHOT_CODEC_CLASSES));
+    }
+    for (Class<?> codecClass : codecClasses) {
+      try {
+        SnapshotCodec codec = (SnapshotCodec) (codecClass.newInstance());
+        codecs.put(codec.getVersion(), codec);
+        LOG.debug("Using snapshot codec {} for snapshots of version {}", codecClass.getName(), codec.getVersion());
+      } catch (Exception e) {
+        LOG.warn("Error instantiating snapshot codec {}. Skipping.", codecClass.getName(), e);
+      }
+    }
+  }
+
+  /**
+   * Retrieve the codec for a particular version of the encoding.
+   * @param version the version of interest
+   * @return the corresponding codec
+   * @throws java.lang.IllegalArgumentException if the version is not known
+   */
+  @Nonnull
+  @VisibleForTesting
+  SnapshotCodec getCodecForVersion(int version) {
+    SnapshotCodec codec = codecs.get(version);
+    if (codec == null) {
+      throw new IllegalArgumentException(String.format("Version %d of snapshot encoding is not supported", version));
+    }
+    return codec;
+  }
+
+  /**
+   * Retrieve the current snapshot codec, that is, the codec with the highest known version.
+   * @return the current codec
+   * @throws java.lang.IllegalStateException if no codecs are registered
+   */
+  private SnapshotCodec getCurrentCodec() {
+    if (codecs.isEmpty()) {
+      throw new IllegalStateException(String.format("No codecs are registered."));
+    }
+    return codecs.get(codecs.lastKey());
+  }
+
+  // Return the appropriate codec for the version in InputStream
+  private SnapshotCodec getCodec(InputStream in) {
+    BinaryDecoder decoder = new BinaryDecoder(in);
+    int persistedVersion;
+    try {
+      persistedVersion = decoder.readInt();
+    } catch (IOException e) {
+      LOG.error("Unable to read transaction state version: ", e);
+      throw Throwables.propagate(e);
+    }
+    return getCodecForVersion(persistedVersion);
+  }
+
+  @Override
+  public int getVersion() {
+    return getCurrentCodec().getVersion();
+  }
+
+  @Override
+  public TransactionSnapshot decode(InputStream in) {
+    return getCodec(in).decode(in);
+  }
+
+  @Override
+  public TransactionVisibilityState decodeTransactionVisibilityState(InputStream in) {
+    return getCodec(in).decodeTransactionVisibilityState(in);
+  }
+
+  @Override
+  public void encode(OutputStream out, TransactionSnapshot snapshot) {
+    SnapshotCodec codec = getCurrentCodec();
+    try {
+      new BinaryEncoder(out).writeInt(codec.getVersion());
+    } catch (IOException e) {
+      LOG.error("Unable to write transaction state version: ", e);
+      throw Throwables.propagate(e);
+    }
+    codec.encode(out, snapshot);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV2.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV2.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV2.java
new file mode 100644
index 0000000..ccf026d
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV2.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.snapshot;
+
+import com.google.common.collect.Maps;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.persist.TransactionSnapshot;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+/**
+ * Handles serialization/deserialization of a {@link TransactionSnapshot}
+ * and its elements to {@code byte[]}.
+ */
+public class SnapshotCodecV2 extends DefaultSnapshotCodec {
+  @Override
+  public int getVersion() {
+    return 2;
+  }
+
+  @Override
+  protected void encodeInProgress(BinaryEncoder encoder, Map<Long, TransactionManager.InProgressTx> inProgress)
+    throws IOException {
+
+    if (!inProgress.isEmpty()) {
+      encoder.writeInt(inProgress.size());
+      for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) {
+        encoder.writeLong(entry.getKey()); // tx id
+        encoder.writeLong(entry.getValue().getExpiration());
+        encoder.writeLong(entry.getValue().getVisibilityUpperBound());
+        encoder.writeInt(entry.getValue().getType().ordinal());
+      }
+    }
+    encoder.writeInt(0); // zero denotes end of list as per AVRO spec
+  }
+
+  @Override
+  protected NavigableMap<Long, TransactionManager.InProgressTx> decodeInProgress(BinaryDecoder decoder)
+    throws IOException {
+
+    int size = decoder.readInt();
+    NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap();
+    while (size != 0) { // zero denotes end of list as per AVRO spec
+      for (int remaining = size; remaining > 0; --remaining) {
+        long txId = decoder.readLong();
+        long expiration = decoder.readLong();
+        long visibilityUpperBound = decoder.readLong();
+        int txTypeIdx = decoder.readInt();
+        TransactionType txType;
+        try {
+          txType = TransactionType.values()[txTypeIdx];
+        } catch (ArrayIndexOutOfBoundsException e) {
+          throw new IOException("Type enum ordinal value is out of range: " + txTypeIdx);
+        }
+        inProgress.put(txId,
+                       new TransactionManager.InProgressTx(visibilityUpperBound, expiration, txType,
+                           new LongArrayList()));
+      }
+      size = decoder.readInt();
+    }
+    return inProgress;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV3.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV3.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV3.java
new file mode 100644
index 0000000..1b9e2b3
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV3.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.snapshot;
+
+import org.apache.tephra.persist.TransactionSnapshot;
+
+/**
+ * Handles serialization/deserialization of a {@link TransactionSnapshot}
+ * and its elements to {@code byte[]}.
+ *
+ * <p>The serialization/deserialization of this codec is the same as that performed by {@link SnapshotCodecV2},
+ * but a new version number is used to allow easy migration from projects using deprecated codecs with
+ * conflicting version numbers.</p>
+ */
+public class SnapshotCodecV3 extends SnapshotCodecV2 {
+  @Override
+  public int getVersion() {
+    return 3;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV4.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV4.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV4.java
new file mode 100644
index 0000000..cadaa8e
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV4.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.snapshot;
+
+import com.google.common.collect.Maps;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.tephra.TransactionManager;
+import org.apache.tephra.TransactionType;
+import org.apache.tephra.persist.TransactionSnapshot;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NavigableMap;
+
+/**
+ * Handles serialization/deserialization of a {@link TransactionSnapshot}
+ * and its elements to {@code byte[]}.
+ *
+ */
+public class SnapshotCodecV4 extends SnapshotCodecV2 {
+  @Override
+  public int getVersion() {
+    return 4;
+  }
+
+  @Override
+  protected void encodeInProgress(BinaryEncoder encoder, Map<Long, TransactionManager.InProgressTx> inProgress)
+      throws IOException {
+
+    if (!inProgress.isEmpty()) {
+      encoder.writeInt(inProgress.size());
+      for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) {
+        encoder.writeLong(entry.getKey()); // tx id
+        encoder.writeLong(entry.getValue().getExpiration());
+        encoder.writeLong(entry.getValue().getVisibilityUpperBound());
+        encoder.writeInt(entry.getValue().getType().ordinal());
+        // write checkpoint tx IDs
+        LongArrayList checkpointPointers = entry.getValue().getCheckpointWritePointers();
+        if (checkpointPointers != null && !checkpointPointers.isEmpty()) {
+          encoder.writeInt(checkpointPointers.size());
+          for (int i = 0; i < checkpointPointers.size(); i++) {
+            encoder.writeLong(checkpointPointers.getLong(i));
+          }
+        }
+        encoder.writeInt(0);
+      }
+    }
+    encoder.writeInt(0); // zero denotes end of list as per AVRO spec
+  }
+
+  @Override
+  protected NavigableMap<Long, TransactionManager.InProgressTx> decodeInProgress(BinaryDecoder decoder)
+      throws IOException {
+
+    int size = decoder.readInt();
+    NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap();
+    while (size != 0) { // zero denotes end of list as per AVRO spec
+      for (int remaining = size; remaining > 0; --remaining) {
+        long txId = decoder.readLong();
+        long expiration = decoder.readLong();
+        long visibilityUpperBound = decoder.readLong();
+        int txTypeIdx = decoder.readInt();
+        TransactionType txType;
+        try {
+          txType = TransactionType.values()[txTypeIdx];
+        } catch (ArrayIndexOutOfBoundsException e) {
+          throw new IOException("Type enum ordinal value is out of range: " + txTypeIdx);
+        }
+        // read checkpoint tx IDs
+        int checkpointPointerSize = decoder.readInt();
+        LongArrayList checkpointPointers = new LongArrayList(checkpointPointerSize);
+        while (checkpointPointerSize != 0) {
+          for (int checkpointRemaining = checkpointPointerSize; checkpointRemaining > 0; --checkpointRemaining) {
+            checkpointPointers.add(decoder.readLong());
+          }
+          checkpointPointerSize = decoder.readInt();
+        }
+        inProgress.put(txId,
+            new TransactionManager.InProgressTx(visibilityUpperBound, expiration, txType, checkpointPointers));
+      }
+      size = decoder.readInt();
+    }
+    return inProgress;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/package-info.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/package-info.java
new file mode 100644
index 0000000..cf1a276
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains interfaces and implementations for encoding and decoding transaction snapshots.
+ */
+package org.apache.tephra.snapshot;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationFactory.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationFactory.java b/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationFactory.java
new file mode 100644
index 0000000..a13668c
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.util;
+
+import com.google.inject.Provider;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Provides {@code org.apache.hadoop.conf.Configuration} instances, constructed by the correct version used
+ * for the runtime.
+ */
+public class ConfigurationFactory implements Provider<Configuration> {
+  private static class ConfigurationProviderFactory extends HBaseVersionSpecificFactory<ConfigurationProvider> {
+    @Override
+    protected String getHBase96Classname() {
+      return "org.apache.tephra.hbase96.HBase96ConfigurationProvider";
+    }
+
+    @Override
+    protected String getHBase98Classname() {
+      return "org.apache.tephra.hbase98.HBase98ConfigurationProvider";
+    }
+
+    @Override
+    protected String getHBase10Classname() {
+      return "org.apache.tephra.hbase10.HBase10ConfigurationProvider";
+    }
+
+    @Override
+    protected String getHBase11Classname() {
+      return "org.apache.tephra.hbase11.HBase11ConfigurationProvider";
+    }
+
+    @Override
+    protected String getHBase10CDHClassname() {
+      return "org.apache.tephra.hbase10cdh.HBase10ConfigurationProvider";
+    }
+  }
+
+  private final ConfigurationProvider provider = new ConfigurationProviderFactory().get();
+
+  /**
+   * Returns a new {@link org.apache.hadoop.conf.Configuration} instance from the HBase version-specific factory.
+   */
+  @Override
+  public Configuration get() {
+    return provider.get();
+  }
+
+  /**
+   * Returns a new {@link org.apache.hadoop.conf.Configuration} instance from the HBase version-specific factory.
+   *
+   * @param baseConf additional configuration properties to merge on to the classpath configuration
+   * @return the merged configuration
+   */
+  public Configuration get(Configuration baseConf) {
+    return provider.get(baseConf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationProvider.java b/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationProvider.java
new file mode 100644
index 0000000..e133c69
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationProvider.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.util;
+
+import com.google.inject.Provider;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Provides {@code Configuration} instances, constructed by the HBase version on which we are running.
+ */
+public abstract class ConfigurationProvider implements Provider<Configuration> {
+  @Override
+  public abstract Configuration get();
+
+  public abstract Configuration get(Configuration baseConf);
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersion.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersion.java b/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersion.java
new file mode 100644
index 0000000..687e46d
--- /dev/null
+++ b/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersion.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.text.ParseException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Detects the currently loaded HBase version.  It is assumed that only one HBase version is loaded at a time,
+ * since using more than one HBase version within the same process will require classloader isolation anyway.
+ */
+public class HBaseVersion {
+  private static final String HBASE_94_VERSION = "0.94";
+  private static final String HBASE_96_VERSION = "0.96";
+  private static final String HBASE_98_VERSION = "0.98";
+  private static final String HBASE_10_VERSION = "1.0";
+  private static final String HBASE_11_VERSION = "1.1";
+  private static final String HBASE_12_VERSION = "1.2";
+  private static final String CDH_CLASSIFIER = "cdh";
+
+  private static final Logger LOG = LoggerFactory.getLogger(HBaseVersion.class);
+
+  /**
+   * Represents the major version of the HBase library that is currently loaded.
+   */
+  public enum Version {
+    HBASE_94("0.94"),
+    HBASE_96("0.96"),
+    HBASE_98("0.98"),
+    HBASE_10("1.0"),
+    HBASE_10_CDH("1.0-cdh"),
+    HBASE_11("1.1"),
+    HBASE_12_CDH("1.2-cdh"),
+    UNKNOWN("unknown");
+
+    final String majorVersion;
+
+    Version(String majorVersion) {
+      this.majorVersion = majorVersion;
+    }
+
+    public String getMajorVersion() {
+      return majorVersion;
+    }
+  }
+
+  private static Version currentVersion;
+  private static String versionString;
+  static {
+    try {
+      Class versionInfoClass = Class.forName("org.apache.hadoop.hbase.util.VersionInfo");
+      Method versionMethod = versionInfoClass.getMethod("getVersion");
+      versionString = (String) versionMethod.invoke(null);
+      if (versionString.startsWith(HBASE_94_VERSION)) {
+        currentVersion = Version.HBASE_94;
+      } else if (versionString.startsWith(HBASE_96_VERSION)) {
+        currentVersion = Version.HBASE_96;
+      } else if (versionString.startsWith(HBASE_98_VERSION)) {
+        currentVersion = Version.HBASE_98;
+      } else if (versionString.startsWith(HBASE_10_VERSION)) {
+        VersionNumber ver = VersionNumber.create(versionString);
+        if (ver.getClassifier() != null && ver.getClassifier().startsWith(CDH_CLASSIFIER)) {
+          currentVersion = Version.HBASE_10_CDH;
+        } else {
+          currentVersion = Version.HBASE_10;
+        }
+      } else if (versionString.startsWith(HBASE_11_VERSION)) {
+        currentVersion = Version.HBASE_11;
+      } else if (versionString.startsWith(HBASE_12_VERSION)) {
+        VersionNumber ver = VersionNumber.create(versionString);
+        if (ver.getClassifier() != null && ver.getClassifier().startsWith(CDH_CLASSIFIER)) {
+          currentVersion = Version.HBASE_12_CDH;
+        } else {
+          // CDH 5.7 comes with HBase version 1.2.0-CDH5.7.0. However currently there is no
+          // other hadoop distribution that uses HBase 1.2, so the version is set here to UNKNOWN.
+          currentVersion = Version.UNKNOWN;
+        }
+      } else {
+        currentVersion = Version.UNKNOWN;
+      }
+    } catch (Throwable e) {
+      // must be a class loading exception, HBase is not there
+      LOG.error("Unable to determine HBase version from string '{}', are HBase classes available?", versionString);
+      LOG.error("Exception was: ", e);
+      currentVersion = Version.UNKNOWN;
+    }
+  }
+
+  /**
+   * Returns the major version of the currently loaded HBase library.
+   */
+  public static Version get() {
+    return currentVersion;
+  }
+
+  /**
+   * Returns the full version string for the currently loaded HBase library.
+   */
+  public static String getVersionString() {
+    return versionString;
+  }
+
+  /**
+   * Prints out the HBase {@link Version} enum value for the current version of HBase on the classpath.
+   */
+  public static void main(String[] args) {
+    boolean verbose = args.length == 1 && "-v".equals(args[0]);
+    Version version = HBaseVersion.get();
+    System.out.println(version.getMajorVersion());
+    if (verbose) {
+      System.out.println("versionString=" + getVersionString());
+    }
+  }
+
+  /**
+   * Utility class to parse apart version number components.  The version string provided is expected to be in
+   * the format: major[.minor[.patch[.last]][-classifier][-SNAPSHOT]
+   *
+   * <p>Only the major version number is actually required.</p>
+   */
+  public static class VersionNumber {
+    private static final Pattern PATTERN =
+        Pattern.compile("(\\d+)(\\.(\\d+))?(\\.(\\d+))?(\\.(\\d+))?(\\-(?!SNAPSHOT)([^\\-]+))?(\\-SNAPSHOT)?");
+
+    private Integer major;
+    private Integer minor;
+    private Integer patch;
+    private Integer last;
+    private String classifier;
+    private boolean snapshot;
+
+    private VersionNumber(Integer major, Integer minor, Integer patch, Integer last,
+                          String classifier, boolean snapshot) {
+      this.major = major;
+      this.minor = minor;
+      this.patch = patch;
+      this.last = last;
+      this.classifier = classifier;
+      this.snapshot = snapshot;
+    }
+
+    public Integer getMajor() {
+      return major;
+    }
+
+    public Integer getMinor() {
+      return minor;
+    }
+
+    public Integer getPatch() {
+      return patch;
+    }
+
+    public Integer getLast() {
+      return last;
+    }
+
+    public String getClassifier() {
+      return classifier;
+    }
+
+    public boolean isSnapshot() {
+      return snapshot;
+    }
+
+    public static VersionNumber create(String versionString) throws ParseException {
+      Matcher matcher = PATTERN.matcher(versionString);
+      if (matcher.matches()) {
+        String majorString = matcher.group(1);
+        String minorString = matcher.group(3);
+        String patchString = matcher.group(5);
+        String last = matcher.group(7);
+        String classifier = matcher.group(9);
+        String snapshotString = matcher.group(10);
+        return new VersionNumber(new Integer(majorString),
+            minorString != null ? new Integer(minorString) : null,
+            patchString != null ? new Integer(patchString) : null,
+            last != null ? new Integer(last) : null,
+            classifier,
+            "-SNAPSHOT".equals(snapshotString));
+      }
+      throw new ParseException(
+          "Input string did not match expected pattern: major[.minor[.patch]][-classifier][-SNAPSHOT]", 0);
+    }
+  }
+}